#include<assert.h>#include<stdio.h>#include<stdlib.h>#include<margo.h>staticconstintTOTAL_RPCS=4;staticintnum_rpcs=0;// declaration of an RPC handler.
staticvoidhello_world(hg_handle_th);DECLARE_MARGO_RPC_HANDLER(hello_world)intmain(intargc,char**argv){margo_instance_idmid=margo_init("tcp",MARGO_SERVER_MODE,0,-1);assert(mid);hg_addr_tmy_address;margo_addr_self(mid,&my_address);charaddr_str[128];size_taddr_str_size=128;margo_addr_to_string(mid,addr_str,&addr_str_size,my_address);margo_addr_free(mid,my_address);margo_set_log_level(mid,MARGO_LOG_INFO);margo_info(mid,"Server running at address %s",addr_str);// register the RPC handler in the Margo instance
hg_id_trpc_id=MARGO_REGISTER(mid,"hello",void,void,hello_world);// indicate that this RPC handler does not send a response back to the client.
margo_registered_disable_response(mid,rpc_id,HG_TRUE);margo_wait_for_finalize(mid);return0;}staticvoidhello_world(hg_handle_th){hg_return_tret;// inside an RPC handler, we can access
// the Margo instance using margo_hg_handle_get_instance
margo_instance_idmid=margo_hg_handle_get_instance(h);margo_info(mid,"Hello World!");num_rpcs+=1;// must call margo_destroy on the hg_handle_t argument
// it is being passed, after we are done using it.
ret=margo_destroy(h);assert(ret==HG_SUCCESS);if(num_rpcs==TOTAL_RPCS){margo_finalize(mid);}}DEFINE_MARGO_RPC_HANDLER(hello_world)
#include<assert.h>#include<stdio.h>#include<stdlib.h>#include<margo.h>intmain(intargc,char**argv){if(argc!=2){fprintf(stderr,"Usage: %s <server address>\n",argv[0]);exit(0);}hg_return_tret;margo_instance_idmid=MARGO_INSTANCE_NULL;mid=margo_init("tcp",MARGO_CLIENT_MODE,0,0);assert(mid);hg_id_thello_rpc_id=MARGO_REGISTER(mid,"hello",void,void,NULL);margo_registered_disable_response(mid,hello_rpc_id,HG_TRUE);hg_addr_tsvr_addr;// server’s address must be resolved into a hg_addr_t object.
ret=margo_addr_lookup(mid,argv[1],&svr_addr);assert(ret==HG_SUCCESS);hg_handle_thandle;// create a hg_handle_t object.
ret=margo_create(mid,svr_addr,hello_rpc_id,&handle);assert(ret==HG_SUCCESS);// sends the request to the server
ret=margo_forward(handle,NULL);assert(ret==HG_SUCCESS);ret=margo_destroy(handle);assert(ret==HG_SUCCESS);ret=margo_addr_free(mid,svr_addr);assert(ret==HG_SUCCESS);margo_finalize(mid);return0;}
sum example
message define
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#ifndef PARAM_H
#define PARAM_H
#include<mercury.h>#include<mercury_macros.h>/* We use the Mercury macros to define the input
* and output structures along with the serialization
* functions.
*/MERCURY_GEN_PROC(sum_in_t,((int32_t)(x))\
((int32_t)(y)))MERCURY_GEN_PROC(sum_out_t,((int32_t)(ret)))#endif
The <mercury_proc_string.h> may also be included. It provides the hg_string_t and hg_const_string_t types, which are typedefs of char* and const char* respectively and must be used to represent null-terminated strings.
The structures defined with the MERCURY_GEN_PROC cannot contain pointers (apart from the hg_string_t and hg_const_string_t types). We will see in a future tutorial how to define serialization functions for more complex structures.
#include<assert.h>#include<stdio.h>#include<margo.h>#include"types.h"typedefstruct{intmax_rpcs;intnum_rpcs;}server_data;staticvoidsum(hg_handle_th);DECLARE_MARGO_RPC_HANDLER(sum)intmain(intargc,char**argv){margo_instance_idmid=margo_init("tcp",MARGO_SERVER_MODE,0,0);assert(mid);server_datasvr_data={.max_rpcs=4,.num_rpcs=0};hg_addr_tmy_address;margo_addr_self(mid,&my_address);charaddr_str[128];size_taddr_str_size=128;margo_addr_to_string(mid,addr_str,&addr_str_size,my_address);margo_addr_free(mid,my_address);margo_info(mid,"Server running at address %s",addr_str);hg_id_trpc_id=MARGO_REGISTER(mid,"sum",sum_in_t,sum_out_t,sum);//attach data to an RPC handler.
margo_register_data(mid,rpc_id,&svr_data,NULL);margo_wait_for_finalize(mid);return0;}staticvoidsum(hg_handle_th){hg_return_tret;sum_in_tin;sum_out_tout;margo_instance_idmid=margo_hg_handle_get_instance(h);margo_set_log_level(mid,MARGO_LOG_INFO);conststructhg_info*info=margo_get_info(h);server_data*svr_data=(server_data*)margo_registered_data(mid,info->id);// deserialize the content of the RPC’s data into the variable in
ret=margo_get_input(h,&in);assert(ret==HG_SUCCESS);out.ret=in.x+in.y;margo_trace(mid,"Computed %d + %d = %d",in.x,in.y,out.ret);// sent result back to the client
ret=margo_respond(h,&out);assert(ret==HG_SUCCESS);ret=margo_free_input(h,&in);assert(ret==HG_SUCCESS);ret=margo_destroy(h);assert(ret==HG_SUCCESS);svr_data->num_rpcs+=1;if(svr_data->num_rpcs==svr_data->max_rpcs){margo_finalize(mid);}}DEFINE_MARGO_RPC_HANDLER(sum)
An input deserialized using margo_get_input should be freed using margo_free_input.
#include<assert.h>#include<stdio.h>#include<stdlib.h>#include<margo.h>#include"types.h"intmain(intargc,char**argv){if(argc!=2){fprintf(stderr,"Usage: %s <server address>\n",argv[0]);exit(0);}margo_instance_idmid=margo_init("tcp",MARGO_CLIENT_MODE,0,0);margo_set_log_level(mid,MARGO_LOG_INFO);// register the sent and received type
hg_id_tsum_rpc_id=MARGO_REGISTER(mid,"sum",sum_in_t,sum_out_t,NULL);hg_addr_tsvr_addr;margo_addr_lookup(mid,argv[1],&svr_addr);inti;sum_in_targs;for(i=0;i<4;i++){args.x=42+i*2;args.y=42+i*2+1;hg_handle_th;margo_create(mid,svr_addr,sum_rpc_id,&h);margo_forward(h,&args);sum_out_tresp;// deserialized the value returned by the server into the resp variable.
margo_get_output(h,&resp);margo_info(mid,"Got response: %d+%d = %d\n",args.x,args.y,resp.ret);margo_free_output(h,&resp);margo_destroy(h);}margo_addr_free(mid,svr_addr);margo_finalize(mid);return0;}
We must call margo_free_output on the client side because the output has been obtained using margo_get_output.
We can use function margo_forward_timed set a timeout, for the rpc call.
The hg_bulk_t opaque type represents a handle to a region of memory in a process. In addition to this handle, we add a field n that will tell us how many values are in the buffer.
margo_bulk_create is used to create an hg_bulk_t handle representing the segment of memory exposed by the client.
margo_instance_id
the number of segments to expose
void** array of addresses pointing to each segment
a hg_size_t* array of sizes for each segment
the mode used to expose the memory region. HG_BULK_READ_ONLY indicates that Margo will only read (i.e., the server will only pull) from this segment. HG_BULK_WRITE_ONLY indicates that Margo will only write to the segment and HG_BULK_READWRITE indicates that both operations may happen.
#include<assert.h>#include<stdio.h>#include<stdlib.h>#include<margo.h>#include"types.h"staticconstintTOTAL_RPCS=16;staticintnum_rpcs=0;staticvoidsum(hg_handle_th);DECLARE_MARGO_RPC_HANDLER(sum)intmain(intargc,char**argv){margo_instance_idmid=margo_init("tcp",MARGO_SERVER_MODE,0,0);assert(mid);margo_set_log_level(mid,MARGO_LOG_INFO);hg_addr_tmy_address;margo_addr_self(mid,&my_address);charaddr_str[128];size_taddr_str_size=128;margo_addr_to_string(mid,addr_str,&addr_str_size,my_address);margo_addr_free(mid,my_address);margo_info(mid,"Server running at address %s\n",addr_str);MARGO_REGISTER(mid,"sum",sum_in_t,sum_out_t,sum);margo_wait_for_finalize(mid);return0;}staticvoidsum(hg_handle_th){hg_return_tret;num_rpcs+=1;sum_in_tin;sum_out_tout;int32_t*values;hg_bulk_tlocal_bulk;margo_instance_idmid=margo_hg_handle_get_instance(h);conststructhg_info*info=margo_get_info(h);hg_addr_tclient_addr=info->addr;ret=margo_get_input(h,&in);assert(ret==HG_SUCCESS);values=calloc(in.n,sizeof(*values));hg_size_tbuf_size=in.n*sizeof(*values);ret=margo_bulk_create(mid,1,(void**)&values,&buf_size,HG_BULK_WRITE_ONLY,&local_bulk);assert(ret==HG_SUCCESS);ret=margo_bulk_transfer(mid,HG_BULK_PULL,client_addr,in.bulk,0,local_bulk,0,buf_size);assert(ret==HG_SUCCESS);out.ret=0;inti;for(i=0;i<in.n;i++){out.ret+=values[i];}ret=margo_respond(h,&out);assert(ret==HG_SUCCESS);ret=margo_bulk_free(local_bulk);assert(ret==HG_SUCCESS);free(values);ret=margo_free_input(h,&in);assert(ret==HG_SUCCESS);ret=margo_destroy(h);assert(ret==HG_SUCCESS);if(num_rpcs==TOTAL_RPCS){margo_finalize(mid);}}DEFINE_MARGO_RPC_HANDLER(sum)
#include<assert.h>#include<stdio.h>#include<stdlib.h>#include<margo.h>#include"types.h"intmain(intargc,char**argv){if(argc!=2){fprintf(stderr,"Usage: %s <server address>\n",argv[0]);exit(0);}margo_instance_idmid=margo_init("tcp",MARGO_CLIENT_MODE,0,0);margo_set_log_level(mid,MARGO_LOG_DEBUG);hg_id_tsum_rpc_id=MARGO_REGISTER(mid,"sum",sum_in_t,sum_out_t,NULL);hg_addr_tsvr_addr;margo_addr_lookup(mid,argv[1],&svr_addr);inti;sum_in_targs;for(i=0;i<4;i++){args.x=42+i*2;args.y=42+i*2+1;hg_handle_th;margo_create(mid,svr_addr,sum_rpc_id,&h);margo_requestreq;margo_iforward(h,&args,&req);margo_debug(mid,"Waiting for reply...");margo_wait(req);sum_out_tresp;margo_get_output(h,&resp);margo_debug(mid,"Got response: %d+%d = %d",args.x,args.y,resp.ret);margo_free_output(h,&resp);margo_destroy(h);}margo_addr_free(mid,svr_addr);margo_finalize(mid);return0;}
Instead of using margo_forward, we use margo_iforward. This function returns immediately after having sent the RPC to the server. It also takes an extra argument of type margo_request*. The client will use this request object to check the status of the RPC.
We then use margo_wait on the request to block until we have received a response from the server. Alternatively, margo_test can be be used to check whether the server has sent a response, without blocking if it hasn’t.
It is safe to delete or modify the RPC’s input right after the call to margo_iforward. margo_iforward indeed returns after having serialized this input into its send buffer.
#include<assert.h>#include<stdio.h>#include<margo.h>#include"types.h"staticconstintTOTAL_RPCS=16;staticintnum_rpcs=0;staticvoidsum(hg_handle_th);DECLARE_MARGO_RPC_HANDLER(sum)intmain(intargc,char**argv){margo_instance_idmid=margo_init("tcp",MARGO_SERVER_MODE,0,0);assert(mid);margo_set_log_level(mid,MARGO_LOG_INFO);hg_addr_tmy_address;margo_addr_self(mid,&my_address);charaddr_str[128];size_taddr_str_size=128;margo_addr_to_string(mid,addr_str,&addr_str_size,my_address);margo_addr_free(mid,my_address);margo_info(mid,"Server running at address %s",addr_str);MARGO_REGISTER(mid,"sum",sum_in_t,sum_out_t,sum);margo_wait_for_finalize(mid);return0;}staticvoidsum(hg_handle_th){hg_return_tret;num_rpcs+=1;sum_in_tin;sum_out_tout;margo_instance_idmid=margo_hg_handle_get_instance(h);ret=margo_get_input(h,&in);assert(ret==HG_SUCCESS);out.ret=in.x+in.y;margo_info(mid,"Computed %d + %d = %d",in.x,in.y,out.ret);margo_thread_sleep(mid,1000);margo_requestreq;ret=margo_irespond(h,&out,&req);assert(ret==HG_SUCCESS);/* ... do other work ... */ret=margo_wait(req);assert(ret==HG_SUCCESS);ret=margo_free_input(h,&in);assert(ret==HG_SUCCESS);ret=margo_destroy(h);assert(ret==HG_SUCCESS);if(num_rpcs==TOTAL_RPCS){margo_finalize(mid);}}DEFINE_MARGO_RPC_HANDLER(sum)
margo_respond (the blocking version) returns when the response has been sent, but does not guarantees that the client has received it. Its behavior is not very different from margo_irespond, which returns as soon as the response has been scheduled for sending. Hence it is unlikely that you ever need margo_irespond.
Just like there is a margo_forward_timed, there is a margo_iforward_timed, which takes an additional parameter (before the request pointer) indicating a timeout in millisecond. Should the server not respond within this time limit, the called to margo_wait on the resulting request will return HG_TIMEOUT.