Index: daemon/events.c =================================================================== --- daemon/events.c (.../spread-4.0.0) (revision 16) +++ daemon/events.c (.../spread-4.0.0-poll-epoll.4.1) (revision 16) @@ -50,6 +50,11 @@ #include #include #include + +#ifdef HAVE_EPOLL +#include +#endif + #else /* ARCH_PC_WIN95 */ #include @@ -84,13 +89,29 @@ int num_fds; int num_active_fds; fd_event events[MAX_FD_EVENTS]; +#if defined(HAVE_EPOLL) + int epoll_fd; + int priority; + /* epoll_event_map is a simplistic hash table that maps a file + descriptor to an index into the events array above. */ + short epoll_event_map[MAX_FD_EVENTS]; + struct epoll_event epoll_events[MAX_FD_EVENTS]; +#endif } fd_queue; static time_event *Time_queue; static sp_time Now; static fd_queue Fd_queue[NUM_PRIORITY]; +#if defined(HAVE_EPOLL) +static int epoll_fd; +static int SelectToEpoll[NUM_FDTYPES] = { EPOLLIN , EPOLLOUT, EPOLLERR }; +#define EPOLL_EVENT_INDEX(e) (*((short *)(e).data.ptr)); +#define EPOLL_MAP_INDEX(fd) ((fd) % MAX_FD_EVENTS) +#define EPOLL_DATA_PTR(priority, fd) (&(Fd_queue[priority].epoll_event_map[EPOLL_MAP_INDEX(fd)])) +#else static fd_set Fd_mask[NUM_FDTYPES]; +#endif static int Active_priority; static int Exit_events; @@ -110,11 +131,40 @@ { Fd_queue[i].num_fds = 0; Fd_queue[i].num_active_fds = 0; +#if defined(HAVE_EPOLL) + Fd_queue[i].priority = i; +#endif } +#if defined(HAVE_EPOLL) + /* + * Create an epoll descriptor to test which queues have + * ready I/O events. + */ + if((epoll_fd = epoll_create(NUM_PRIORITY)) < 0) + Alarm(EXIT, "E_Init: Failed to create epoll_fd.\n"); + + /* + * Create queue-specifice epoll descriptors to retrieve + * ready I/O events for each queue. + */ + for (i = 0; i < NUM_PRIORITY; ++i) { + struct epoll_event event; + event.events = EPOLLIN | EPOLLOUT | EPOLLERR; + event.data.ptr = &Fd_queue[i]; + if((Fd_queue[i].epoll_fd = epoll_create(MAX_FD_EVENTS)) < 0) + Alarm(EXIT, + "E_Init: Failed to create Fd_queue[%d].epoll_fd.\n", i); + if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, Fd_queue[i].epoll_fd, + &event) != 0) + Alarm(EXIT, "E_Init: Failed to add Fd_queue[%d].epoll_fd. " + "Error: %d\n", i, errno); + } +#else for ( i=0; i < NUM_FDTYPES; i++ ) { FD_ZERO( &Fd_mask[i] ); } +#endif Active_priority = LOW_PRIORITY; E_get_time(); @@ -368,12 +418,15 @@ /* Windows bug: Reports FD_SETSIZE of 64 but select works on all * fd's even ones with numbers greater then 64. */ +#if !defined(HAVE_EPOLL) if( fd < 0 || fd > FD_SETSIZE ) { Alarm( PRINT, "E_attach_fd: invalid fd %d (max %d) with fd_type %d with priority %d\n", fd, FD_SETSIZE, fd_type, priority ); return( -1 ); } #endif + +#endif for( j=0; j < Fd_queue[priority].num_fds; j++ ) { if( ( Fd_queue[priority].events[j].fd == fd ) && ( Fd_queue[priority].events[j].fd_type == fd_type ) ) @@ -403,7 +456,24 @@ Fd_queue[priority].events[num_fds].active = TRUE; Fd_queue[priority].num_fds++; Fd_queue[priority].num_active_fds++; - if( Active_priority <= priority ) FD_SET( fd, &Fd_mask[fd_type] ); + if( Active_priority <= priority ) { +#if defined(HAVE_EPOLL) + struct epoll_event event; + event.events = SelectToEpoll[fd_type]; + event.data.ptr = EPOLL_DATA_PTR(priority, fd); + Fd_queue[priority].epoll_event_map[EPOLL_MAP_INDEX(fd)] = num_fds; + if(epoll_ctl(Fd_queue[priority].epoll_fd, + EPOLL_CTL_ADD, fd, &event) != 0 && errno != EEXIST) + { + Alarm(PRINT, + "E_attach_fd: Failed to add Fd_queue[%d].epoll_fd. " + "Error: %d\n", priority, errno); + return -1; + } +#else + FD_SET( fd, &Fd_mask[fd_type] ); +#endif + } Alarm( EVENTS, "E_attach_fd: fd %d, fd_type %d, code %d, data 0x%x, priority %d Active_priority %d\n", fd, fd_type, code, data, priority, Active_priority ); @@ -428,12 +498,23 @@ { if( ( Fd_queue[i].events[j].fd == fd ) && ( Fd_queue[i].events[j].fd_type == fd_type ) ) { +#if defined(HAVE_EPOLL) + // Linux kernels prior to 2.6.9 will bomb if a NULL arg + // is used with EPOLL_CTL_DEL. + struct epoll_event event; +#endif if (Fd_queue[i].events[j].active) Fd_queue[i].num_active_fds--; Fd_queue[i].num_fds--; Fd_queue[i].events[j] = Fd_queue[i].events[Fd_queue[i].num_fds]; +#if defined(HAVE_EPOLL) + Fd_queue[i].epoll_event_map[EPOLL_MAP_INDEX(Fd_queue[i].events[j].fd)] = j; + epoll_ctl(Fd_queue[i].epoll_fd, EPOLL_CTL_DEL, fd, + &event); +#else FD_CLR( fd, &Fd_mask[fd_type] ); +#endif found = 1; break; /* from the j for only */ @@ -463,10 +544,20 @@ { if( ( Fd_queue[i].events[j].fd == fd ) && ( Fd_queue[i].events[j].fd_type == fd_type ) ) { +#if defined(HAVE_EPOLL) + // Linux kernels prior to 2.6.9 will bomb if a NULL arg + // is used with EPOLL_CTL_DEL. + struct epoll_event event; +#endif if (Fd_queue[i].events[j].active) Fd_queue[i].num_active_fds--; Fd_queue[i].events[j].active = FALSE; +#if defined(HAVE_EPOLL) + epoll_ctl(Fd_queue[i].epoll_fd, EPOLL_CTL_DEL, fd, + &event); +#else FD_CLR( fd, &Fd_mask[fd_type] ); +#endif found = 1; break; /* from the j for only */ @@ -498,7 +589,26 @@ if ( !(Fd_queue[i].events[j].active) ) Fd_queue[i].num_active_fds++; Fd_queue[i].events[j].active = TRUE; - if( i >= Active_priority ) FD_SET( fd, &Fd_mask[ fd_type ] ); + if( i >= Active_priority ) { +#if defined(HAVE_EPOLL) + struct epoll_event event; + event.events = SelectToEpoll[fd_type]; + event.data.ptr = EPOLL_DATA_PTR(i, fd); + Fd_queue[i].epoll_event_map[EPOLL_MAP_INDEX(fd)] = j; + if(epoll_ctl(Fd_queue[i].epoll_fd, + EPOLL_CTL_ADD, fd, &event) != 0 && + errno != EEXIST) + { + Alarm(PRINT, + "E_activate_fd: Failed to add " + "Fd_queue[%d].epoll_fd. Error: %d\n", + i, errno); + return -1; + } +#else + FD_SET( fd, &Fd_mask[ fd_type ] ); +#endif + } found = 1; break; /* from the j for only */ @@ -511,8 +621,11 @@ int E_set_active_threshold( int priority ) { + int i; +#if !defined(HAVE_EPOLL) int fd_type; - int i,j; + int j; +#endif if( priority < 0 || priority > NUM_PRIORITY ) { @@ -523,18 +636,37 @@ if( priority == Active_priority ) return( priority ); Active_priority = priority; +#if defined(HAVE_EPOLL) + { + struct epoll_event event; + for(i = 0; i < priority; ++i) + epoll_ctl(epoll_fd, EPOLL_CTL_DEL, Fd_queue[i].epoll_fd, &event); + + event.events = EPOLLIN | EPOLLOUT | EPOLLERR; + + for(i = priority; i < NUM_PRIORITY; ++i) { + event.data.ptr = &Fd_queue[i]; + if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, Fd_queue[i].epoll_fd, + &event) != 0 && errno != EEXIST) + Alarm(PRINT, + "E_set_active_threshold: Failed to add " + "Fd_queue[%d].epoll_fd. Error: %d\n", i, errno); + } + } +#else for ( i=0; i < NUM_FDTYPES; i++ ) { FD_ZERO( &Fd_mask[i] ); } - for( i = priority; i < NUM_PRIORITY; i++ ) for( j=0; j < Fd_queue[i].num_fds; j++ ) { fd_type = Fd_queue[i].events[j].fd_type; - if (Fd_queue[i].events[j].active) + if (Fd_queue[i].events[j].active) { FD_SET( Fd_queue[i].events[j].fd, &Fd_mask[fd_type] ); + } } +#endif Alarm( EVENTS, "E_set_active_threshold: changed to %d\n",Active_priority); @@ -551,6 +683,154 @@ return( Fd_queue[priority].num_active_fds ); } +#if defined(HAVE_EPOLL) + +void E_handle_events(void) +{ + static const sp_time long_timeout = { 10000, 0}; +#ifdef BADCLOCK + static const sp_time mili_sec = { 0, 1000}; + int clock_sync; +#endif + int num_set; + int fd; + int fd_type; + int i, j; + sp_time timeout; + int sel_timeout; + time_event *temp_ptr; + int first = 1; + struct epoll_event epoll_events[NUM_PRIORITY]; + int ready[NUM_PRIORITY]; + int ready_set[NUM_PRIORITY]; + +#ifdef BADCLOCK + clock_sync = 0; +#endif + + memset(ready_set, 0, sizeof(ready_set)); + + for(Exit_events = 0 ; !Exit_events ; ) { + Alarm( EVENTS, "E_handle_events: next event \n"); + + /* Handle time events */ + timeout = long_timeout; + + while( Time_queue != NULL ) { +#ifdef BADCLOCK + if ( clock_sync >= 0 ) { + E_get_time(); + clock_sync = -20; + } +#else + E_get_time(); +#endif + if(!first && E_compare_time( Now, Time_queue->t ) >= 0 ) { + temp_ptr = Time_queue; + Time_queue = Time_queue->next; + Alarm( EVENTS, "E_handle_events: exec time event \n"); + temp_ptr->func( temp_ptr->code, temp_ptr->data ); + dispose( temp_ptr ); +#ifdef BADCLOCK + Now = E_add_time( Now, mili_sec ); + clock_sync++; +#else + E_get_time(); +#endif + if (Exit_events) goto end_handler; + } else { + timeout = E_sub_time( Time_queue->t, Now ); + break; + } + } + if (timeout.sec < 0 ) + timeout.sec = timeout.usec = 0; /* this can happen until first is unset */ + Alarm( EVENTS, "E_handle_events: poll select\n"); + + num_set = epoll_wait(epoll_fd, epoll_events, NUM_PRIORITY, 0); + + if (num_set == 0 && !Exit_events) + { +#ifdef BADCLOCK + clock_sync = 0; +#endif + Alarm( EVENTS, "E_handle_events: select with timeout (%d, %d)\n", + timeout.sec,timeout.usec ); +#ifdef TESTTIME + req_time = E_add_time(req_time, timeout); +#endif + + sel_timeout = timeout.sec * 1000 + (timeout.usec / 1000); + if(sel_timeout == 0 && timeout.usec > 0) + sel_timeout = 1; + num_set = epoll_wait(epoll_fd, epoll_events, NUM_PRIORITY, sel_timeout); + } + + memset(ready, 0, sizeof(ready)); + for(i = 0; i < num_set; ++i) { + fd_queue *q = (fd_queue*)epoll_events[i].data.ptr; + ready[q->priority] = 1; + } + + /* + * Handle high priority events first, then medium priority, then low. + * Process only one event for a priority class when a higher priority + * class had events processed. We overload num_set to indicate if + * the previous priority level had events processed. + */ + num_set = 0; + for(i = HIGH_PRIORITY; i >= LOW_PRIORITY; --i) { + if(ready[i]) { + int min_set; + + if(ready_set[i] < 1) { + ready_set[i] = + epoll_wait(Fd_queue[i].epoll_fd, Fd_queue[i].epoll_events, + Fd_queue[i].num_active_fds, 0); + } + + if(ready_set[i] > 0 && num_set) + min_set = ready_set[i] - 1; + else + min_set = 0; + + for(j = ready_set[i] - 1; j >= min_set; --j) { + int index = EPOLL_EVENT_INDEX(Fd_queue[i].epoll_events[j]); + + --ready_set[i]; + + fd = Fd_queue[i].events[index].fd; + fd_type = Fd_queue[i].events[index].fd_type; + + Alarm(EVENTS, + "E_handle_events: exec handler for fd %d, fd_type %d, " + "priority %d\n", fd, fd_type, i); + + Fd_queue[i].events[index].func(Fd_queue[i].events[index].fd, + Fd_queue[i].events[index].code, + Fd_queue[i].events[index].data); + + num_set = 1; + if (Exit_events) goto end_handler; + } + } + } + + first = 0; + } + end_handler: + /* Clean up data structures for exit OR restart of handler loop */ + /* Actually nothing needs to be cleaned up to allow E_handle_events() + * to be called again. The events are still registered (or not registered) + * and the only state for the actual events loop is Exit_events which is reset + * in the for loop. + */ + + return; +} + +#else + void E_handle_events(void) { static int Round_robin = 0; @@ -756,6 +1036,7 @@ return; } +#endif void E_exit_events(void) { Index: libspread/sp.c =================================================================== --- libspread/sp.c (.../spread-4.0.0) (revision 16) +++ libspread/sp.c (.../spread-4.0.0-poll-epoll.4.1) (revision 16) @@ -54,6 +54,10 @@ #include #include +#if defined(HAVE_POLL) +#include +#endif + #else /* ARCH_PC_WIN95 */ #include @@ -268,7 +272,12 @@ static int recv_nointr_timeout(int s, void *buf, size_t len, int flags, sp_time *time_out) { int ret, num_ready; +#if defined(HAVE_POLL) + struct pollfd pset[1]; + int poll_time; +#else fd_set rset,fixed_rset; +#endif sp_time start_time, temp_time, target_time, wait_time; struct timeval sel_time; @@ -279,12 +288,28 @@ start_time = E_get_time(); target_time = E_add_time(start_time, *time_out); wait_time = *time_out; + sel_time.tv_sec = wait_time.sec; + sel_time.tv_usec = wait_time.usec; + +#if defined(HAVE_POLL) + pset[0].fd = s; + pset[0].events = POLLIN | POLLPRI; + pset[0].revents = 0; + poll_time = + (sel_time.tv_sec * 1000) + (sel_time.tv_usec / 1000); + if(poll_time == 0 && sel_time.tv_usec > 0) + poll_time = 1; + while(((num_ready = poll(pset, 1, poll_time)) == -1) && + ((sock_errno == EINTR) || (sock_errno == EAGAIN) || + (sock_errno == EWOULDBLOCK)) && + !((pset[0].revents & POLLHUP) || + (pset[0].revents & POLLNVAL))) +#else FD_ZERO(&fixed_rset); FD_SET(s, &fixed_rset); rset = fixed_rset; - sel_time.tv_sec = wait_time.sec; - sel_time.tv_usec = wait_time.usec; while( ((num_ready = select(s+1, &rset, NULL, NULL, &sel_time)) == -1) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) ) +#endif { temp_time = E_get_time(); if (E_compare_time(temp_time, target_time) < 0 ) { @@ -296,7 +321,9 @@ sock_set_errno( ERR_TIMEDOUT ); return(-1); } +#if !defined(HAVE_POLL) rset = fixed_rset; +#endif } if ( !num_ready ) { printf("recv_nointr_timeout: Timed out\n"); @@ -324,7 +351,12 @@ static int connect_nointr_timeout(int s, struct sockaddr *sname, socklen_t slen, sp_time *time_out) { int ret, num_ready; +#if defined(HAVE_POLL) + struct pollfd pset[1]; + int poll_time; +#else fd_set rset,fixed_rset,wset; +#endif sp_time start_time, temp_time, target_time, wait_time; struct timeval sel_time; int non_blocking = 0; @@ -354,6 +386,21 @@ while( ((ret = connect( s, sname, slen ) ) == -1) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK) || (sock_errno == EINPROGRESS)) ) { +#if defined(HAVE_POLL) + Alarmp( SPLOG_DEBUG, SESSION, "connect_nointr_timeout: connect in progress for socket %d, now wait in poll\n", s); + pset[0].fd = s; + pset[0].events = POLLIN | POLLPRI | POLLOUT; + pset[0].revents = 0; + poll_time = + (sel_time.tv_sec * 1000) + (sel_time.tv_usec / 1000); + if(poll_time == 0 && sel_time.tv_usec > 0) + poll_time = 1; + /* wait for poll to timeout (num_ready == 0), give a permanent error (num_ready < 0 && sock_errno != transient). If transient error, retry after checking to make sure timeout has not expired */ + while(((num_ready = poll(pset, 1, poll_time)) == -1) && + ((sock_errno == EINTR) || (sock_errno == EAGAIN) || + (sock_errno == EWOULDBLOCK)) && + !((pset[0].revents & POLLHUP) || (pset[0].revents & POLLNVAL))) +#else FD_ZERO(&fixed_rset); FD_SET(s, &fixed_rset); rset = fixed_rset; @@ -361,6 +408,7 @@ Alarmp( SPLOG_DEBUG, SESSION, "connect_nointr_timeout: connect in progress for socket %d, now wait in select\n", s); /* wait for select to timeout (num_ready == 0), give a permanent error (num_ready < 0 && sock_errno != transient). If transient error, retry after checking to make sure timeout has not expired */ while( ((num_ready = select(s+1, &rset, &wset, NULL, &sel_time)) == -1) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) ) +#endif { temp_time = E_get_time(); if (E_compare_time(temp_time, target_time) < 0 ) { @@ -374,8 +422,11 @@ ret = -1; goto done_connect_try; } + +#if !defined(HAVE_POLL) rset = fixed_rset; wset = rset; +#endif } if ( num_ready == 0 ) { /* timeout */ @@ -389,7 +440,11 @@ ret = -1; break; } +#if defined(HAVE_POLL) + else if (pset[0].revents != 0) +#else if (FD_ISSET(s, &rset) || FD_ISSET( s, &wset)) +#endif { err = 0; elen = sizeof(err);