diff --git a/linux/drivers/block/aoe/aoe.h b/linux/drivers/block/aoe/aoe.h index abb15af..a38a3d4 100644 --- a/linux/drivers/block/aoe/aoe.h +++ b/linux/drivers/block/aoe/aoe.h @@ -1,5 +1,5 @@ /* Copyright (c) 2008 Coraid, Inc. See COPYING for GPL terms. */ -#define VERSION "70" +#define VERSION "70maxioc_qpcpu" #define AOE_MAJOR 152 #define DEVICE_NAME "aoe" @@ -194,16 +194,6 @@ struct aoedev { char ident[512]; }; -/* kthread tracking */ -struct ktstate { - struct completion rendez; - struct task_struct *task; - wait_queue_head_t *waitq; - void (*fn) (void); - char *name; - spinlock_t *lock; -}; - int aoeblk_init(void); void aoeblk_exit(void); void aoeblk_gdalloc(void *); @@ -228,8 +218,6 @@ void aoe_bio_pagedec(struct bio *); void aoe_bio_pageinc(struct bio *); void aoe_flush_iocq(void); void aoe_end_request(struct aoedev *, struct request *, int); -void aoe_ktstop(struct ktstate *); -int aoe_ktstart(struct ktstate *); void aoe_freetframe(struct frame *); struct list_head *list_head_el(struct list_head *); @@ -253,7 +241,7 @@ unsigned long long mac_addr(char addr[6]); */ extern void *aoe_kcalloc(size_t, size_t, int); -#define AOEDBG_ACTIVE 0 +#define AOEDBG_ACTIVE 1 void __init aoedbg_init(void); void aoedbg_print(char *fmt, ...); void aoedbg_rtt(struct aoedev *d, struct aoe_hdr *h, struct frame *f, int rtt); diff --git a/linux/drivers/block/aoe/aoecmd.c b/linux/drivers/block/aoe/aoecmd.c index d8eb28d..bc060a6 100644 --- a/linux/drivers/block/aoe/aoecmd.c +++ b/linux/drivers/block/aoe/aoecmd.c @@ -29,14 +29,18 @@ module_param(aoe_maxout, int, 0644); MODULE_PARM_DESC(aoe_maxout, "Only aoe_maxout outstanding packets for every MAC on eX.Y."); -static wait_queue_head_t ktiowq; -static struct ktstate kts; +static int aoe_maxioc; +module_param(aoe_maxioc, int, 0644); +MODULE_PARM_DESC(aoe_maxioc, "When nonzero, perform at most aoe_maxioc I/O completions before scheduling."); /* io completion queue */ -static struct { +struct iocq { struct list_head head; spinlock_t lock; -} iocq; +}; +static DEFINE_PER_CPU(struct iocq, iocq); +static int iocq_stop; +struct work_struct iocq_work; /* empty_zero_page is not always exported */ static const char *empty_page; @@ -1232,73 +1236,61 @@ out: dev_kfree_skb(skb); } -// enters with iocq.lock held -static void -ktio(void) +static struct frame * +ktio_check(int do_pop) { - struct frame *f; + unsigned long flags; struct list_head *pos; + struct frame *f; + struct iocq *q; - while ((pos = list_head_el(&iocq.head))) { + q = &get_cpu_var(iocq); + spin_lock_irqsave(&q->lock, flags); + pos = list_head_el(&q->head); + if (pos && do_pop) list_del(pos); - spin_unlock_irq(&iocq.lock); - f = list_entry(pos, struct frame, head); - ktiocomplete(f); - spin_lock_irq(&iocq.lock); - } -} + spin_unlock_irqrestore(&q->lock, flags); + put_cpu(); -static int -kthread(void *vp) -{ - struct ktstate *k; - DECLARE_WAITQUEUE(wait, current); - sigset_t blocked; - - k = vp; -#ifdef PF_NOFREEZE - current->flags |= PF_NOFREEZE; -#endif - set_user_nice(current, -10); - sigfillset(&blocked); - sigprocmask(SIG_BLOCK, &blocked, NULL); - flush_signals(current); - complete(&k->rendez); - do { - __set_current_state(TASK_UNINTERRUPTIBLE); - spin_lock_irq(k->lock); - k->fn(); - add_wait_queue(k->waitq, &wait); - __set_current_state(TASK_INTERRUPTIBLE); - spin_unlock_irq(k->lock); - schedule(); - remove_wait_queue(k->waitq, &wait); - } while (!kthread_should_stop()); - __set_current_state(TASK_RUNNING); - complete(&k->rendez); - return 0; -} + if (!pos) + return NULL; + f = list_entry(pos, struct frame, head); -void -aoe_ktstop(struct ktstate *k) -{ - kthread_stop(k->task); - wait_for_completion(&k->rendez); + return f; } -int -aoe_ktstart(struct ktstate *k) +static struct frame * +ktio_pop(void) { return ktio_check(1); } + +static struct frame * +ktio_peek(void) { return ktio_check(0); } + +static void +ktio(struct work_struct *work) { - struct task_struct *task; - - init_completion(&k->rendez); - task = kthread_run(kthread, k, k->name); - if (task == NULL || IS_ERR(task)) - return -EFAULT; - k->task = task; - wait_for_completion(&k->rendez); - init_completion(&k->rendez); // for exit - return 0; + struct frame *f; + int i; + int cpu = get_cpu(); /* XXXdebug */ + + aoedbg_print("%s: running on cpu %d\n", + __func__, cpu); + f = ktio_pop(); + for (i = 0; f; ++i, f = ktio_pop()) { + aoedbg_print("%s: completing lba 0x%016llx on cpu %d for e%ld.%d\n", + __func__, (unsigned long long) f->lba, + cpu, (long) f->t->d->aoemajor, f->t->d->aoeminor); + + ktiocomplete(f); + if (aoe_maxioc && i == aoe_maxioc) { + if (ktio_peek()) { + aoedbg_print("%s: scheduling iocq work on cpu %d\n", + __func__, cpu); + schedule_work(&iocq_work); + } + break; + } + } + put_cpu(); /* XXXdebug */ } // pass it off to kthreads for processing @@ -1306,12 +1298,20 @@ static void ktcomplete(struct frame *f, struct sk_buff *skb) { ulong flags; + struct iocq *q; + int cpu = get_cpu(); /* XXXdebug */ f->r_skb = skb; - spin_lock_irqsave(&iocq.lock, flags); - list_add_tail(&f->head, &iocq.head); - spin_unlock_irqrestore(&iocq.lock, flags); - wake_up(&ktiowq); + if (iocq_stop) + ; /* XXXfixme: need rendezvous */ + q = &__get_cpu_var(iocq); /* XXXdebug */ + spin_lock_irqsave(&q->lock, flags); + list_add_tail(&f->head, &q->head); + spin_unlock_irqrestore(&q->lock, flags); + aoedbg_print("%s: scheduling iocq work on cpu %d\n", + __func__, cpu); + schedule_work(&iocq_work); + put_cpu(); } struct sk_buff * @@ -1660,50 +1660,57 @@ aoe_flush_iocq(void) struct list_head *pos; struct sk_buff *skb; ulong flags; - - spin_lock_irqsave(&iocq.lock, flags); - list_splice(&iocq.head, &flist); - INIT_LIST_HEAD(&iocq.head); - spin_unlock_irqrestore(&iocq.lock, flags); - while ((pos = list_head_el(&flist))) { - list_del(pos); - f = list_entry(pos, struct frame, head); - d = f->t->d; - skb = f->r_skb; - spin_lock_irqsave(&d->lock, flags); - if (f->buf) { - f->buf->nframesout--; - aoe_failbuf(d, f->buf); + int cpu; + struct iocq *q; + + for_each_cpu(cpu, cpu_possible_mask) { + q = &per_cpu(iocq, cpu); + spin_lock_irqsave(&q->lock, flags); + list_splice(&q->head, &flist); + INIT_LIST_HEAD(&q->head); + spin_unlock_irqrestore(&q->lock, flags); + while ((pos = list_head_el(&flist))) { + list_del(pos); + f = list_entry(pos, struct frame, head); + d = f->t->d; + skb = f->r_skb; + spin_lock_irqsave(&d->lock, flags); + if (f->buf) { + f->buf->nframesout--; + aoe_failbuf(d, f->buf); + } + aoe_freetframe(f); + spin_unlock_irqrestore(&d->lock, flags); + dev_kfree_skb(skb); + aoedev_put(d); } - aoe_freetframe(f); - spin_unlock_irqrestore(&d->lock, flags); - dev_kfree_skb(skb); - aoedev_put(d); } } int __init aoecmd_init(void) { + int cpu; + struct iocq *q; + empty_page = aoe_kcalloc(2, PAGE_SIZE, GFP_KERNEL | __GFP_REPEAT); if (!empty_page) return -ENOMEM; atomic_inc(&aoe_zero_page()->_count); - INIT_LIST_HEAD(&iocq.head); - spin_lock_init(&iocq.lock); - init_waitqueue_head(&ktiowq); - kts.name = "aoe_ktio"; - kts.fn = ktio; - kts.waitq = &ktiowq; - kts.lock = &iocq.lock; - return aoe_ktstart(&kts); + for_each_cpu(cpu, cpu_possible_mask) { + q = &per_cpu(iocq, cpu); + INIT_LIST_HEAD(&q->head); + spin_lock_init(&q->lock); + } + INIT_WORK(&iocq_work, ktio); + return 0; } void aoecmd_exit(void) { - aoe_ktstop(&kts); + iocq_stop = 1; aoe_flush_iocq(); atomic_dec(&aoe_zero_page()->_count); diff --git a/linux/drivers/block/aoe/aoenet.c b/linux/drivers/block/aoe/aoenet.c index 4082dd4..01441b7 100644 --- a/linux/drivers/block/aoe/aoenet.c +++ b/linux/drivers/block/aoe/aoenet.c @@ -32,9 +32,6 @@ static char aoe_iflist[IFLISTSZ]; module_param_string(aoe_iflist, aoe_iflist, IFLISTSZ, 0600); MODULE_PARM_DESC(aoe_iflist, "aoe_iflist=\"dev1 [dev2 ...]\"\n"); -static wait_queue_head_t txwq; -static struct ktstate kts; - #ifndef MODULE static int __init aoe_iflist_setup(char *str) { @@ -46,24 +43,71 @@ static int __init aoe_iflist_setup(char *str) __setup("aoe_iflist=", aoe_iflist_setup); #endif -static spinlock_t txlock; -static struct sk_buff_head skbtxq; +struct txq { + spinlock_t lock; + struct sk_buff_head q; +}; +static DEFINE_PER_CPU(struct txq, txq); +struct work_struct txq_work; + +static int aoe_maxtx; /* XXXdebug */ +module_param(aoe_maxtx, int, 0644); +MODULE_PARM_DESC(aoe_maxtx, + "When nonzero, perform at most aoe_maxtx transmissions before scheduling."); + +static struct sk_buff * +txq_check(int pop) +{ + unsigned long flags; + struct sk_buff *skb; + struct txq *q; + + q = &get_cpu_var(txq); + spin_lock_irqsave(&q->lock, flags); + skb = skb_peek(&q->q); + if (pop && skb) + skb_dequeue(&q->q); + spin_unlock_irqrestore(&q->lock, flags); + put_cpu(); + + return skb; +} + +static struct sk_buff * +txq_pop(void) { return txq_check(1); } + +static struct sk_buff * +txq_peek(void) { return txq_check(0); } // enters with txlock held static void -tx(void) +tx(struct work_struct *work) { struct sk_buff *skb; + int i; + int cpu = get_cpu(); /* XXXdebug */ + + aoedbg_print("%s: running on cpu %d\n", __func__, cpu); - while ((skb = skb_dequeue(&skbtxq))) { - spin_unlock_irq(&txlock); + skb = txq_pop(); + for (i = 0; skb; ++i, skb = txq_pop()) { + aoedbg_print("%s: transmitting on cpu %d\n", __func__, cpu); + if (dev_queue_xmit(skb) == NET_XMIT_DROP && net_ratelimit()) printk(KERN_WARNING "aoe: packet could not be sent on %s. %s\n", skb->dev ? skb->dev->name : "netif", "consider increasing tx_queue_len"); - spin_lock_irq(&txlock); + if (aoe_maxtx && i == aoe_maxtx) { + if (txq_peek()) { + aoedbg_print("%s: scheduling tx work on cpu %d\n", + __func__, cpu); + schedule_work(&txq_work); + } + break; + } } + put_cpu(); /* XXXdebug */ } /* This function is copied here from linux-2.6.10-rc3-bk11/lib/string.c @@ -144,15 +188,23 @@ aoenet_xmit(struct sk_buff *sl) { struct sk_buff *skb; ulong flags; + struct txq *q; + int cpu = get_cpu(); /* XXXdebug */ + //q = &get_cpu_var(txq); + q = &__get_cpu_var(txq); /* XXXdebug */ while ((skb = sl)) { sl = sl->next; skb->next = skb->prev = NULL; - spin_lock_irqsave(&txlock, flags); - skb_queue_tail(&skbtxq, skb); - spin_unlock_irqrestore(&txlock, flags); - wake_up(&txwq); + spin_lock_irqsave(&q->lock, flags); + skb_queue_tail(&q->q, skb); + spin_unlock_irqrestore(&q->lock, flags); + + aoedbg_print("%s: scheduling tx work on cpu %d\n", + __func__, cpu); + schedule_work(&txq_work); } + put_cpu(); } /* @@ -230,15 +282,15 @@ static struct packet_type aoe_pt = { int __init aoenet_init(void) { - skb_queue_head_init(&skbtxq); - init_waitqueue_head(&txwq); - spin_lock_init(&txlock); - kts.lock = &txlock; - kts.fn = tx; - kts.waitq = &txwq; - kts.name = "aoe_tx"; - if (aoe_ktstart(&kts)) - return -EAGAIN; + int cpu; + struct txq *q; + + for_each_cpu(cpu, cpu_possible_mask) { + q = &per_cpu(txq, cpu); + skb_queue_head_init(&q->q); + spin_lock_init(&q->lock); + } + INIT_WORK(&txq_work, tx); dev_add_pack(&aoe_pt); return 0; } @@ -246,8 +298,9 @@ aoenet_init(void) void aoenet_exit(void) { - aoe_ktstop(&kts); - skb_queue_purge(&skbtxq); + int cpu; + + for_each_cpu(cpu, cpu_possible_mask) + skb_queue_purge(&(per_cpu(txq, cpu).q)); dev_remove_pack(&aoe_pt); } -