-
Notifications
You must be signed in to change notification settings - Fork 0
/
publisher.c
152 lines (113 loc) · 4.14 KB
/
publisher.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#include "header.h"
#define CLIENT_ID 1
#define PATH_LOG "publisher.log"
int send_master(int sock, master t1);
int get_max_msgid(int sockfd, master t1);
FILE *fptr;
int main(int argc, char *argv[]){
if(argc != 3){
printf("\nUsage : ./a.out <BrokerIP> <BrokerPort>\n");
exit(0);
}
if((fptr = fopen(PATH_LOG,"w")) == NULL)
DieWithError("log file fopen()");
int sock;
struct sockaddr_in echoServAddr;
if ((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
DieWithError("socket() failed");
memset(&echoServAddr, 0, sizeof(echoServAddr));
echoServAddr.sin_family = AF_INET;
echoServAddr.sin_addr.s_addr = inet_addr(argv[1]);
echoServAddr.sin_port = htons(atoi(argv[2]));
if (connect(sock, (struct sockaddr *) &echoServAddr,sizeof(echoServAddr)) < 0)
DieWithError("connect() failed");
else
printf("\nconnected to the broker");
char op_type;
while(1){
printf("please select the operation that you want to perform\n");
printf("press: 1 - create topic, 2 - send message , 3 - send file\n");
scanf("%c",&op_type);
if(op_type == '1'){
master t1;
printf("\nplease enter topic name that you want to create\n");
t1.client_id = CLIENT_ID;
t1.type = GET_MAX_MSGID;
t1.broker_id = -1;
scanf("%s", t1.topic_name);
int reply = get_max_msgid(sock, t1);
if(reply == 0){
t1.type = CREATE_MSG;
printf("\nplease enter the first message that you want to post\n");
scanf("%s", t1.message);
t1.msg_id = 1;
t1.broker_id = -1;
send_master(sock, t1);
}
else{
t1.type = CREATE_MSG;
printf("\ntopic already exists, enter the message that you want to post\n");
scanf("%s", t1.message);
t1.msg_id = reply + 1;
t1.broker_id = -1;
send_master(sock, t1);
}
}
if(op_type == '2'){
master t1;
t1.type = GET_MAX_MSGID;
t1.client_id = CLIENT_ID;
printf("\nplease enter the topic you want to post the message to\n");
scanf("%s",t1.topic_name);
printf("\nplease enter the message that you want to send\n");
scanf("%s",t1.message);
t1.broker_id = -1;
t1.msg_id = 0;
int reply = get_max_msgid(sock, t1);
if(reply == 0){
printf("\ntopic does not exists, topic will be created and message wll be posted\n");
t1.type = CREATE_MSG;
t1.msg_id = 1;
send_master(sock, t1);
}
else{
t1.type = CREATE_MSG;
t1.msg_id = reply + 1;
t1.broker_id = -1;
send_master(sock, t1);
}
}
if(op_type == '3'){
master t1;
t1.type = 3;
printf("\nplease enter the topic you want to post the message to\n");
scanf("%s",t1.topic_name);
char file_name[100], c;
printf("\nplease enter the file name you want to send\n");
scanf("%s",file_name);
FILE *fptr;
fptr = fopen(file_name, "r");
if (fptr == NULL) {
printf("Cannot open file \n");
exit(0);
}
int count = 0;
c=fgetc(fptr);
while(c != EOF){
}
}
}
printf("\n");
close(sock);
exit(0);
}
int get_max_msgid(int sockfd, master t1){
if (write(sockfd, &t1 , sizeof(t1)) == -1)
DieWithError("send() sent a different number of bytes than expected");
master reply_struct;
if(read(sockfd, &reply_struct, sizeof(reply_struct)) < 0)
DieWithError("could not read while get_max_msgid");
printf("Reply of get_max_msgid = \n");
display(reply_struct,1,1,1,1,1,1);
return reply_struct.msg_id;
}