diff options
Diffstat (limited to 'arch/ia64/sn/kernel/xpc_channel.c')
-rw-r--r-- | arch/ia64/sn/kernel/xpc_channel.c | 329 |
1 files changed, 195 insertions, 134 deletions
diff --git a/arch/ia64/sn/kernel/xpc_channel.c b/arch/ia64/sn/kernel/xpc_channel.c index 94698be..abf4fc2 100644 --- a/arch/ia64/sn/kernel/xpc_channel.c +++ b/arch/ia64/sn/kernel/xpc_channel.c @@ -57,6 +57,7 @@ xpc_initialize_channels(struct xpc_partition *part, partid_t partid) spin_lock_init(&ch->lock); sema_init(&ch->msg_to_pull_sema, 1); /* mutex */ + sema_init(&ch->wdisconnect_sema, 0); /* event wait */ atomic_set(&ch->n_on_msg_allocate_wq, 0); init_waitqueue_head(&ch->msg_allocate_wq); @@ -166,6 +167,7 @@ xpc_setup_infrastructure(struct xpc_partition *part) xpc_initialize_channels(part, partid); atomic_set(&part->nchannels_active, 0); + atomic_set(&part->nchannels_engaged, 0); /* local_IPI_amo were set to 0 by an earlier memset() */ @@ -555,8 +557,6 @@ xpc_allocate_msgqueues(struct xpc_channel *ch) sema_init(&ch->notify_queue[i].sema, 0); } - sema_init(&ch->teardown_sema, 0); /* event wait */ - spin_lock_irqsave(&ch->lock, irq_flags); ch->flags |= XPC_C_SETUP; spin_unlock_irqrestore(&ch->lock, irq_flags); @@ -626,6 +626,55 @@ xpc_process_connect(struct xpc_channel *ch, unsigned long *irq_flags) /* + * Notify those who wanted to be notified upon delivery of their message. + */ +static void +xpc_notify_senders(struct xpc_channel *ch, enum xpc_retval reason, s64 put) +{ + struct xpc_notify *notify; + u8 notify_type; + s64 get = ch->w_remote_GP.get - 1; + + + while (++get < put && atomic_read(&ch->n_to_notify) > 0) { + + notify = &ch->notify_queue[get % ch->local_nentries]; + + /* + * See if the notify entry indicates it was associated with + * a message who's sender wants to be notified. It is possible + * that it is, but someone else is doing or has done the + * notification. + */ + notify_type = notify->type; + if (notify_type == 0 || + cmpxchg(¬ify->type, notify_type, 0) != + notify_type) { + continue; + } + + DBUG_ON(notify_type != XPC_N_CALL); + + atomic_dec(&ch->n_to_notify); + + if (notify->func != NULL) { + dev_dbg(xpc_chan, "notify->func() called, notify=0x%p, " + "msg_number=%ld, partid=%d, channel=%d\n", + (void *) notify, get, ch->partid, ch->number); + + notify->func(reason, ch->partid, ch->number, + notify->key); + + dev_dbg(xpc_chan, "notify->func() returned, " + "notify=0x%p, msg_number=%ld, partid=%d, " + "channel=%d\n", (void *) notify, get, + ch->partid, ch->number); + } + } +} + + +/* * Free up message queues and other stuff that were allocated for the specified * channel. * @@ -669,9 +718,6 @@ xpc_free_msgqueues(struct xpc_channel *ch) ch->remote_msgqueue = NULL; kfree(ch->notify_queue); ch->notify_queue = NULL; - - /* in case someone is waiting for the teardown to complete */ - up(&ch->teardown_sema); } } @@ -683,7 +729,7 @@ static void xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags) { struct xpc_partition *part = &xpc_partitions[ch->partid]; - u32 ch_flags = ch->flags; + u32 channel_was_connected = (ch->flags & XPC_C_WASCONNECTED); DBUG_ON(!spin_is_locked(&ch->lock)); @@ -701,12 +747,13 @@ xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags) } DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0); - /* it's now safe to free the channel's message queues */ - - xpc_free_msgqueues(ch); - DBUG_ON(ch->flags & XPC_C_SETUP); + if (part->act_state == XPC_P_DEACTIVATING) { + /* can't proceed until the other side disengages from us */ + if (xpc_partition_engaged(1UL << ch->partid)) { + return; + } - if (part->act_state != XPC_P_DEACTIVATING) { + } else { /* as long as the other side is up do the full protocol */ @@ -724,16 +771,42 @@ xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags) } } + /* wake those waiting for notify completion */ + if (atomic_read(&ch->n_to_notify) > 0) { + /* >>> we do callout while holding ch->lock */ + xpc_notify_senders(ch, ch->reason, ch->w_local_GP.put); + } + /* both sides are disconnected now */ - ch->flags = XPC_C_DISCONNECTED; /* clear all flags, but this one */ + /* it's now safe to free the channel's message queues */ + xpc_free_msgqueues(ch); + + /* mark disconnected, clear all other flags except XPC_C_WDISCONNECT */ + ch->flags = (XPC_C_DISCONNECTED | (ch->flags & XPC_C_WDISCONNECT)); atomic_dec(&part->nchannels_active); - if (ch_flags & XPC_C_WASCONNECTED) { + if (channel_was_connected) { dev_info(xpc_chan, "channel %d to partition %d disconnected, " "reason=%d\n", ch->number, ch->partid, ch->reason); } + + if (ch->flags & XPC_C_WDISCONNECT) { + spin_unlock_irqrestore(&ch->lock, *irq_flags); + up(&ch->wdisconnect_sema); + spin_lock_irqsave(&ch->lock, *irq_flags); + + } else if (ch->delayed_IPI_flags) { + if (part->act_state != XPC_P_DEACTIVATING) { + /* time to take action on any delayed IPI flags */ + spin_lock(&part->IPI_lock); + XPC_SET_IPI_FLAGS(part->local_IPI_amo, ch->number, + ch->delayed_IPI_flags); + spin_unlock(&part->IPI_lock); + } + ch->delayed_IPI_flags = 0; + } } @@ -754,6 +827,19 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number, spin_lock_irqsave(&ch->lock, irq_flags); +again: + + if ((ch->flags & XPC_C_DISCONNECTED) && + (ch->flags & XPC_C_WDISCONNECT)) { + /* + * Delay processing IPI flags until thread waiting disconnect + * has had a chance to see that the channel is disconnected. + */ + ch->delayed_IPI_flags |= IPI_flags; + spin_unlock_irqrestore(&ch->lock, irq_flags); + return; + } + if (IPI_flags & XPC_IPI_CLOSEREQUEST) { @@ -764,7 +850,7 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number, /* * If RCLOSEREQUEST is set, we're probably waiting for * RCLOSEREPLY. We should find it and a ROPENREQUEST packed - * with this RCLOSEQREUQEST in the IPI_flags. + * with this RCLOSEREQUEST in the IPI_flags. */ if (ch->flags & XPC_C_RCLOSEREQUEST) { @@ -779,14 +865,22 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number, /* both sides have finished disconnecting */ xpc_process_disconnect(ch, &irq_flags); + DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED)); + goto again; } if (ch->flags & XPC_C_DISCONNECTED) { - // >>> explain this section - if (!(IPI_flags & XPC_IPI_OPENREQUEST)) { - DBUG_ON(part->act_state != - XPC_P_DEACTIVATING); + if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo, + ch_number) & XPC_IPI_OPENREQUEST)) { + + DBUG_ON(ch->delayed_IPI_flags != 0); + spin_lock(&part->IPI_lock); + XPC_SET_IPI_FLAGS(part->local_IPI_amo, + ch_number, + XPC_IPI_CLOSEREQUEST); + spin_unlock(&part->IPI_lock); + } spin_unlock_irqrestore(&ch->lock, irq_flags); return; } @@ -816,9 +910,13 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number, } XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags); - } else { - xpc_process_disconnect(ch, &irq_flags); + + DBUG_ON(IPI_flags & XPC_IPI_CLOSEREPLY); + spin_unlock_irqrestore(&ch->lock, irq_flags); + return; } + + xpc_process_disconnect(ch, &irq_flags); } @@ -834,7 +932,20 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number, } DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST)); - DBUG_ON(!(ch->flags & XPC_C_RCLOSEREQUEST)); + + if (!(ch->flags & XPC_C_RCLOSEREQUEST)) { + if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo, ch_number) + & XPC_IPI_CLOSEREQUEST)) { + + DBUG_ON(ch->delayed_IPI_flags != 0); + spin_lock(&part->IPI_lock); + XPC_SET_IPI_FLAGS(part->local_IPI_amo, + ch_number, XPC_IPI_CLOSEREPLY); + spin_unlock(&part->IPI_lock); + } + spin_unlock_irqrestore(&ch->lock, irq_flags); + return; + } ch->flags |= XPC_C_RCLOSEREPLY; @@ -852,8 +963,14 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number, "channel=%d\n", args->msg_size, args->local_nentries, ch->partid, ch->number); - if ((ch->flags & XPC_C_DISCONNECTING) || - part->act_state == XPC_P_DEACTIVATING) { + if (part->act_state == XPC_P_DEACTIVATING || + (ch->flags & XPC_C_ROPENREQUEST)) { + spin_unlock_irqrestore(&ch->lock, irq_flags); + return; + } + + if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) { + ch->delayed_IPI_flags |= XPC_IPI_OPENREQUEST; spin_unlock_irqrestore(&ch->lock, irq_flags); return; } @@ -867,8 +984,11 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number, * msg_size = size of channel's messages in bytes * local_nentries = remote partition's local_nentries */ - DBUG_ON(args->msg_size == 0); - DBUG_ON(args->local_nentries == 0); + if (args->msg_size == 0 || args->local_nentries == 0) { + /* assume OPENREQUEST was delayed by mistake */ + spin_unlock_irqrestore(&ch->lock, irq_flags); + return; + } ch->flags |= (XPC_C_ROPENREQUEST | XPC_C_CONNECTING); ch->remote_nentries = args->local_nentries; @@ -906,7 +1026,13 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number, spin_unlock_irqrestore(&ch->lock, irq_flags); return; } - DBUG_ON(!(ch->flags & XPC_C_OPENREQUEST)); + if (!(ch->flags & XPC_C_OPENREQUEST)) { + XPC_DISCONNECT_CHANNEL(ch, xpcOpenCloseError, + &irq_flags); + spin_unlock_irqrestore(&ch->lock, irq_flags); + return; + } + DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST)); DBUG_ON(ch->flags & XPC_C_CONNECTED); @@ -960,8 +1086,8 @@ xpc_connect_channel(struct xpc_channel *ch) struct xpc_registration *registration = &xpc_registrations[ch->number]; - if (down_interruptible(®istration->sema) != 0) { - return xpcInterrupted; + if (down_trylock(®istration->sema) != 0) { + return xpcRetry; } if (!XPC_CHANNEL_REGISTERED(ch->number)) { @@ -1040,55 +1166,6 @@ xpc_connect_channel(struct xpc_channel *ch) /* - * Notify those who wanted to be notified upon delivery of their message. - */ -static void -xpc_notify_senders(struct xpc_channel *ch, enum xpc_retval reason, s64 put) -{ - struct xpc_notify *notify; - u8 notify_type; - s64 get = ch->w_remote_GP.get - 1; - - - while (++get < put && atomic_read(&ch->n_to_notify) > 0) { - - notify = &ch->notify_queue[get % ch->local_nentries]; - - /* - * See if the notify entry indicates it was associated with - * a message who's sender wants to be notified. It is possible - * that it is, but someone else is doing or has done the - * notification. - */ - notify_type = notify->type; - if (notify_type == 0 || - cmpxchg(¬ify->type, notify_type, 0) != - notify_type) { - continue; - } - - DBUG_ON(notify_type != XPC_N_CALL); - - atomic_dec(&ch->n_to_notify); - - if (notify->func != NULL) { - dev_dbg(xpc_chan, "notify->func() called, notify=0x%p, " - "msg_number=%ld, partid=%d, channel=%d\n", - (void *) notify, get, ch->partid, ch->number); - - notify->func(reason, ch->partid, ch->number, - notify->key); - - dev_dbg(xpc_chan, "notify->func() returned, " - "notify=0x%p, msg_number=%ld, partid=%d, " - "channel=%d\n", (void *) notify, get, - ch->partid, ch->number); - } - } -} - - -/* * Clear some of the msg flags in the local message queue. */ static inline void @@ -1240,6 +1317,7 @@ xpc_process_channel_activity(struct xpc_partition *part) u64 IPI_amo, IPI_flags; struct xpc_channel *ch; int ch_number; + u32 ch_flags; IPI_amo = xpc_get_IPI_flags(part); @@ -1266,8 +1344,9 @@ xpc_process_channel_activity(struct xpc_partition *part) xpc_process_openclose_IPI(part, ch_number, IPI_flags); } + ch_flags = ch->flags; /* need an atomic snapshot of flags */ - if (ch->flags & XPC_C_DISCONNECTING) { + if (ch_flags & XPC_C_DISCONNECTING) { spin_lock_irqsave(&ch->lock, irq_flags); xpc_process_disconnect(ch, &irq_flags); spin_unlock_irqrestore(&ch->lock, irq_flags); @@ -1278,9 +1357,9 @@ xpc_process_channel_activity(struct xpc_partition *part) continue; } - if (!(ch->flags & XPC_C_CONNECTED)) { - if (!(ch->flags & XPC_C_OPENREQUEST)) { - DBUG_ON(ch->flags & XPC_C_SETUP); + if (!(ch_flags & XPC_C_CONNECTED)) { + if (!(ch_flags & XPC_C_OPENREQUEST)) { + DBUG_ON(ch_flags & XPC_C_SETUP); (void) xpc_connect_channel(ch); } else { spin_lock_irqsave(&ch->lock, irq_flags); @@ -1305,8 +1384,8 @@ xpc_process_channel_activity(struct xpc_partition *part) /* - * XPC's heartbeat code calls this function to inform XPC that a partition has - * gone down. XPC responds by tearing down the XPartition Communication + * XPC's heartbeat code calls this function to inform XPC that a partition is + * going down. XPC responds by tearing down the XPartition Communication * infrastructure used for the just downed partition. * * XPC's heartbeat code will never call this function and xpc_partition_up() @@ -1314,7 +1393,7 @@ xpc_process_channel_activity(struct xpc_partition *part) * at the same time. */ void -xpc_partition_down(struct xpc_partition *part, enum xpc_retval reason) +xpc_partition_going_down(struct xpc_partition *part, enum xpc_retval reason) { unsigned long irq_flags; int ch_number; @@ -1330,12 +1409,11 @@ xpc_partition_down(struct xpc_partition *part, enum xpc_retval reason) } - /* disconnect all channels associated with the downed partition */ + /* disconnect channels associated with the partition going down */ for (ch_number = 0; ch_number < part->nchannels; ch_number++) { ch = &part->channels[ch_number]; - xpc_msgqueue_ref(ch); spin_lock_irqsave(&ch->lock, irq_flags); @@ -1370,6 +1448,7 @@ xpc_teardown_infrastructure(struct xpc_partition *part) * this partition. */ + DBUG_ON(atomic_read(&part->nchannels_engaged) != 0); DBUG_ON(atomic_read(&part->nchannels_active) != 0); DBUG_ON(part->setup_state != XPC_P_SETUP); part->setup_state = XPC_P_WTEARDOWN; @@ -1428,19 +1507,11 @@ xpc_initiate_connect(int ch_number) if (xpc_part_ref(part)) { ch = &part->channels[ch_number]; - if (!(ch->flags & XPC_C_DISCONNECTING)) { - DBUG_ON(ch->flags & XPC_C_OPENREQUEST); - DBUG_ON(ch->flags & XPC_C_CONNECTED); - DBUG_ON(ch->flags & XPC_C_SETUP); - - /* - * Initiate the establishment of a connection - * on the newly registered channel to the - * remote partition. - */ - xpc_wakeup_channel_mgr(part); - } - + /* + * Initiate the establishment of a connection on the + * newly registered channel to the remote partition. + */ + xpc_wakeup_channel_mgr(part); xpc_part_deref(part); } } @@ -1450,9 +1521,6 @@ xpc_initiate_connect(int ch_number) void xpc_connected_callout(struct xpc_channel *ch) { - unsigned long irq_flags; - - /* let the registerer know that a connection has been established */ if (ch->func != NULL) { @@ -1465,10 +1533,6 @@ xpc_connected_callout(struct xpc_channel *ch) dev_dbg(xpc_chan, "ch->func() returned, reason=xpcConnected, " "partid=%d, channel=%d\n", ch->partid, ch->number); } - - spin_lock_irqsave(&ch->lock, irq_flags); - ch->flags |= XPC_C_CONNECTCALLOUT; - spin_unlock_irqrestore(&ch->lock, irq_flags); } @@ -1506,8 +1570,12 @@ xpc_initiate_disconnect(int ch_number) spin_lock_irqsave(&ch->lock, irq_flags); - XPC_DISCONNECT_CHANNEL(ch, xpcUnregistering, + if (!(ch->flags & XPC_C_DISCONNECTED)) { + ch->flags |= XPC_C_WDISCONNECT; + + XPC_DISCONNECT_CHANNEL(ch, xpcUnregistering, &irq_flags); + } spin_unlock_irqrestore(&ch->lock, irq_flags); @@ -1523,8 +1591,9 @@ xpc_initiate_disconnect(int ch_number) /* * To disconnect a channel, and reflect it back to all who may be waiting. * - * >>> An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by - * >>> xpc_free_msgqueues(). + * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by + * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by + * xpc_disconnect_wait(). * * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN. */ @@ -1532,7 +1601,7 @@ void xpc_disconnect_channel(const int line, struct xpc_channel *ch, enum xpc_retval reason, unsigned long *irq_flags) { - u32 flags; + u32 channel_was_connected = (ch->flags & XPC_C_CONNECTED); DBUG_ON(!spin_is_locked(&ch->lock)); @@ -1547,61 +1616,53 @@ xpc_disconnect_channel(const int line, struct xpc_channel *ch, XPC_SET_REASON(ch, reason, line); - flags = ch->flags; + ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING); /* some of these may not have been set */ ch->flags &= ~(XPC_C_OPENREQUEST | XPC_C_OPENREPLY | XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY | XPC_C_CONNECTING | XPC_C_CONNECTED); - ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING); xpc_IPI_send_closerequest(ch, irq_flags); - if (flags & XPC_C_CONNECTED) { + if (channel_was_connected) { ch->flags |= XPC_C_WASCONNECTED; } + spin_unlock_irqrestore(&ch->lock, *irq_flags); + + /* wake all idle kthreads so they can exit */ if (atomic_read(&ch->kthreads_idle) > 0) { - /* wake all idle kthreads so they can exit */ wake_up_all(&ch->idle_wq); } - spin_unlock_irqrestore(&ch->lock, *irq_flags); - - /* wake those waiting to allocate an entry from the local msg queue */ - if (atomic_read(&ch->n_on_msg_allocate_wq) > 0) { wake_up(&ch->msg_allocate_wq); } - /* wake those waiting for notify completion */ - - if (atomic_read(&ch->n_to_notify) > 0) { - xpc_notify_senders(ch, reason, ch->w_local_GP.put); - } - spin_lock_irqsave(&ch->lock, *irq_flags); } void -xpc_disconnected_callout(struct xpc_channel *ch) +xpc_disconnecting_callout(struct xpc_channel *ch) { /* - * Let the channel's registerer know that the channel is now + * Let the channel's registerer know that the channel is being * disconnected. We don't want to do this if the registerer was never - * informed of a connection being made, unless the disconnect was for - * abnormal reasons. + * informed of a connection being made. */ if (ch->func != NULL) { - dev_dbg(xpc_chan, "ch->func() called, reason=%d, partid=%d, " - "channel=%d\n", ch->reason, ch->partid, ch->number); + dev_dbg(xpc_chan, "ch->func() called, reason=xpcDisconnecting," + " partid=%d, channel=%d\n", ch->partid, ch->number); - ch->func(ch->reason, ch->partid, ch->number, NULL, ch->key); + ch->func(xpcDisconnecting, ch->partid, ch->number, NULL, + ch->key); - dev_dbg(xpc_chan, "ch->func() returned, reason=%d, partid=%d, " - "channel=%d\n", ch->reason, ch->partid, ch->number); + dev_dbg(xpc_chan, "ch->func() returned, reason=" + "xpcDisconnecting, partid=%d, channel=%d\n", + ch->partid, ch->number); } } @@ -1848,7 +1909,7 @@ xpc_send_msg(struct xpc_channel *ch, struct xpc_msg *msg, u8 notify_type, xpc_notify_func func, void *key) { enum xpc_retval ret = xpcSuccess; - struct xpc_notify *notify = NULL; // >>> to keep the compiler happy!! + struct xpc_notify *notify = notify; s64 put, msg_number = msg->number; |