Channels¶
Channels are mechanisms used in the agent to share and synchronize threads. They are basically a queue that supports two primitives:
chan_send.
chan_recv.
Both of them are based on a chan_msg
that is the unit transmitted in the queue.
These functions return the number of messages sent/received,
0 or 1,
with 0 usually being an error in the case of chan_send.
A chan_msg is:
struct chan_msg {
void (*fn)(struct chan_msg *);
void *param;
void *resp;
};
The field fn defines the function
that will attend the message on the receiver side
while param is a pointer to the input parameters
and resp is a pointer to the output response.
The field resp also defines the behaviour of the channel,
if it is NULL then it is considered an asynchronous message
but if it is different than NULL then
it is a synchronous message.
There are several versions the receiving function:
chan_recv: It blocks the thread until a message is received.
chan_tryrecv: It attends a pending message if any and otherwise it just returns without blocking.
chan_timedrecv: It blocks the thread until a message is received or a timeout expires.
It is important to notice that in both cases,
synchronous and asynchronous messages,
the message passed to chan_send is copied when
it is stored in the channel queue,
so the sender can use local variables for chan_msg
even in the case of asynchronous messages.
Beware that it only applies to the chan_msg struct itself
and not to the memory pointed to by params and resp.
For asynchronous messages, when resp is NULL,
care must be taken in allocating params
in the heap or statically, because these data should be
available to the agent running on another thread.
Semantic of synchronous messages¶
A synchronous channel follows the semantic of CSP,
where a communication in the channel implies a rendezvous
between chan_send and chan_recv.
The thread calling chan_send will block
until the other endpoint thread calls chan_recv
and it finishes the execution of the receiving function indicated in the message.
This rendezvous implies a synchronization between both threads
and it means
that the receiving thread can access the parameters in mutual exclusion.
This can be summarized by the Go lemma Share Memory By Communicating:
Don't communicate by sharing memory; share memory by communicating.
Channels allow passing references to data structures between threads. If this is considered passing around ownership of the data (the ability to read and write it), they become a powerful and expressive synchronization mechanism.
Semantic of asynchronous messages¶
When a message is sent using chan_send but
the field resp is NULL
then the message is just enqueued and sends returns immediately.
It means
that the life of parameters passed by params must be longer that
the function calling chan_send,
usually implying dynamic memory because
as the sender doesn't known when
the receiver will attend the message
only the receiver can free the memory allocated by the sender.
Example of a synchronous message¶
struct sum_params {
int *ary;
int n;
};
void
add(struct chan_msg *msg)
{
int i, n, *out = msg->resp;
struct sum_params *p = msg->params;
for (i = n = 0; i < p->n; i++)
n += p->ary[n];
*out = n;
}
void
sender(struct chan *ch, int *array, int size)
{
struct chan_msg msg;
struct sum_params par;
int sum;
par.ary = array
par.n = siz;
msg.fn = add;
msg.params = ∥
msg.resp = ∑
if (chan_send(&msg) != 1) {
perror("sender");
return;
}
printf("sum value=%d\n", ret);
}
void
receiver(struct chan *ch)
{
for (;;)
chan_recv(ch);
}
The thread calling sender will block
until the thread calling receiver finishes the execution of add
and it means
that receiver accesses the integer array in mutual exclusion
without race conditions.
Example of an asynchronous message¶
struct sum_params {
int *ary;
int n;
};
void
add(struct chan_msg *msg)
{
int i, n;
struct sum_params *p = msg->params;
for (i = n = 0; i < p->n; i++)
n += p->ary[n];
printf("sum value=%d\n", n);
free(p);
}
void
sender(struct chan *ch, int *array, int n)
{
struct chan_msg msg;
struct sum_params *par;
par = malloc(sizeof(struct sum_params));
if (!par) {
free(ary);
perror("sender");
}
par->ary = array
par->n = n;
msg.fn = add;
msg.params = ∥
msg.resp = NULL;
if (chan_send(&msg) != 1) {
perror("sender");
return;
}
}
void
receiver(struct chan *ch)
{
for (;;)
chan_recv(ch);
}
The thread calling sender returns from chan_send as soon
the request is stored in the internal queue of the channel
and it means
that it cannot free the memory pointed to by par
because it does not known when
the receiver thread will use it.
In the same way,
the thread calling sender does not know
when it can use the memory pointed by array without race conditions.
External mechanism are needed to ensure the mutual exclusion
of the sender thread has to assume that
the full ownership of the memory pointed to by array
was transferred to the thread calling chan_recv
until the end of life of that memory.