赞
踩
本例使用一个server进程作为多个进程间消息通讯的中转站,其作用相当于网络中的switch。可以实现多个进程间的数据收发。本例稍加修改,也可以作为网络中不同进程的通信服务。
运行服务器命令:
./server
运行客户端:
./client A B "Hi B,
This's A."
./client B A "Hi A,
This's B."
第一个参数为 source
ID,第二个为destination ID。目前只支持一位。另外,本程序不支持客户端的动态登录和退出。
源代码 client.c :
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define handle_error(msg) \
do {
perror(msg); exit(1); } while (0)
void dumpData(char buffer[], char
length);
void* WorkThread_RecvData(void * args);
fd_set catch_fd_set;
fd_set watchset;
int client_sockfd;
char rcvBuffer[256] = {0};
char sendBuffer[256] = {0};
int main(int argc, char * argv[])
{
int len,
err;
pthread_t
ntid;
struct
sockaddr_un server_sockaddr, cli_sockaddr;
int
result;
char
dst_module_id;
char
messageLength = 0;
char
src_module_id;
if (argc
!= 4)
{
printf("Parameter Not Correct.\n");
exit(0);
}
printf("argc = %d \n", argc);
printf("source argv[0] = %s \n", argv[1]);
printf("destination argv[1] = %s \n", argv[2]);
printf("message argv[2] = %s \n", argv[3]);
dst_module_id = argv[2][0];
strcpy((char
*)sendBuffer + 2, argv[3]);
unlink(argv[1]);
messageLength = strlen((char *)sendBuffer + 2);
client_sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
cli_sockaddr.sun_family = AF_UNIX;
strcpy(
cli_sockaddr.sun_path, argv[1]);
bind(
client_sockfd, ( struct sockaddr * )&cli_sockaddr,
sizeof(cli_sockaddr));
server_sockaddr.sun_family = AF_UNIX;
strcpy(
server_sockaddr.sun_path, "server_socket" );
len =
sizeof(server_sockaddr);
result =
connect(client_sockfd, ( struct sockaddr * )&server_sockaddr,
len);
if (result
< 0)
{
printf("Client::error on connecting \n");
}
printf("Client::succeed in connecting with server\n");
sleep(10);
sendBuffer[0] = dst_module_id;
sendBuffer[1] = messageLength;
write(client_sockfd, &sendBuffer, 256);
printf("Client::Write dst module id %d\n", dst_module_id);
printf("Client::Write buffer length %d\n", messageLength);
printf("Client::Write message --\n");
dumpData((char *)sendBuffer + 2, messageLength);
err =
pthread_create(&ntid, NULL, WorkThread_RecvData, NULL);
if (err !=
0)
{
printf("Cannot create thread ... %s\n", strerror(err));
}
while
(1)
{
;
}
}
void dumpData(char buffer[], char length)
{
int i =
0;
for (i =
0; i < length; i++)
{
if (i % 16 == 0)
{
printf("\n");
}
printf("%c", buffer[i]);
}
printf("\n");
}
void* WorkThread_RecvData(void * args)
{
int
rcd;
int len =
0;
int
dataRecv = 0;
FD_ZERO(
&watchset );
FD_SET(
client_sockfd, &watchset );
while
(1)
{
catch_fd_set = watchset;
rcd = select( client_sockfd + 1, &catch_fd_set, NULL, NULL,
(struct timeval *)0 );
if (rcd < 0)
{
handle_error("error select");
}
if (FD_ISSET( client_sockfd, &catch_fd_set ))
{
len = recv(client_sockfd, rcvBuffer + dataRecv, 256 - dataRecv,
0);
dataRecv += len;
if (len > 0)
{
printf("Got data successfully, totally %d bytes.\n", len);
}
else
{
if (len < 0)
{
printf("Failed to receive data, with error code %d '%s'\n", errno,
strerror(errno));
}
else
{
printf("Peer exit already.\n");
}
break;
}
}
if (dataRecv == 256)
{
dumpData((char *)rcvBuffer + 2, rcvBuffer[1]);
dataRecv = 0;
memset(rcvBuffer, 0, 256);
}
}
}
源代码 server.c
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define handle_error(msg) \
do {
perror(msg); exit(1); } while (0)
void dumpData(char buffer[], char length);
char rcdBuffer[256 * 2] = {0};
int dataRecv[2] = {0};
int main()
{
int
rcd;
struct
sockaddr_un server_sockaddr;
int
backlog;
ushort
ci;
int
watch_fd_list[3];
fd_set
catch_fd_set;
fd_set
watchset;
int
new_cli_fd;
int
maxfd;
int socklen,
server_len;
struct
sockaddr_un cli_sockaddr;
struct
{
char
module_id; int cli_sock_fd; }
cli_info_t[2];
for (ci =
0; ci <= 1; ci++)
{
cli_info_t[ci].cli_sock_fd = -1;
}
for (ci =
0; ci <= 2; ci++)
{
watch_fd_list[ci] = -1;
}
int
server_sockfd, client_sockfd;
unlink("server_socket");
server_sockfd = socket( AF_UNIX, SOCK_STREAM, 0 );
if
(server_sockfd == -1)
{
handle_error("error socket");
}
server_sockaddr.sun_family = AF_UNIX;
strcpy(
server_sockaddr.sun_path, "server_socket" );
server_len =
sizeof(server_sockaddr);
rcd = bind(
server_sockfd, ( struct sockaddr * )&server_sockaddr,
server_len );
if (rcd ==
-1)
{
handle_error("error bind");
}
backlog =
5;
rcd =
listen( server_sockfd, backlog );
if (rcd ==
-1)
{
handle_error("error listen");
}
printf("SERVER::Server is waitting on socket=%d
\n", server_sockfd);
watch_fd_list[0] = server_sockfd;
FD_ZERO(
&watchset );
FD_SET(
server_sockfd, &watchset );
maxfd =
watch_fd_list[0];
while
(1)
{
int length;
int fd;
catch_fd_set = watchset;
rcd = select( maxfd + 1, &catch_fd_set, NULL, NULL, (struct
timeval *)0 );
if (rcd < 0)
{
printf("SERVER::Server \n");
exit(1);
}
if (FD_ISSET( server_sockfd, &catch_fd_set ))
{
socklen = sizeof(cli_sockaddr);
new_cli_fd = accept( server_sockfd, ( struct sockaddr *
)&(cli_sockaddr), &socklen );
printf("SERVER::open communication with Client %s
on socket %d\n", cli_sockaddr.sun_path, new_cli_fd);
for (ci = 1; ci <= 2; ci++)
{
if (watch_fd_list[ci] != -1)
{
continue;
}
else
{
watch_fd_list[ci] = new_cli_fd;
break;
}
}
FD_SET(new_cli_fd, &watchset );
if (maxfd < new_cli_fd)
{
maxfd = new_cli_fd;
}
for (ci = 0; ci <= 1; ci++)
{
if (cli_info_t[ci].cli_sock_fd == -1)
{
cli_info_t[ci].module_id =
cli_sockaddr.sun_path[0];
cli_info_t[ci].cli_sock_fd = new_cli_fd;
break;
}
}
continue;
}
for (ci = 1; ci <= 2; ci++)
{
int dst_fd = -1;
char dst_module_id;
char src_module_id;
int i;
if (watch_fd_list[ ci ] == -1)
{
continue;
}
if (!FD_ISSET( watch_fd_list[ ci ], &catch_fd_set ))
{
continue;
}
//The first byte is destination module. The second byte is the
length of following data.
//revd( watch_fd_list[ ci ], &dst_module_id, 1 ) ;
length = recv( watch_fd_list[ ci ], (char *)rcdBuffer
+ (ci - 1) * 256 + dataRecv[ci - 1], 256 - dataRecv[ci - 1],
0);
dataRecv[ci - 1] += length;
if (length > 0)
{
printf("Got data successfully, totally %d bytes.\n", length);
}
else
{
if (length < 0)
{
printf("Failed to receive data, with error code %d '%s'\n", errno,
strerror(errno));
}
else
{
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。