diff options
Diffstat (limited to 'fs/ceph')
-rw-r--r-- | fs/ceph/addr.c | 64 | ||||
-rw-r--r-- | fs/ceph/auth.c | 1 | ||||
-rw-r--r-- | fs/ceph/auth_none.h | 2 | ||||
-rw-r--r-- | fs/ceph/auth_x.c | 32 | ||||
-rw-r--r-- | fs/ceph/caps.c | 44 | ||||
-rw-r--r-- | fs/ceph/dir.c | 16 | ||||
-rw-r--r-- | fs/ceph/file.c | 3 | ||||
-rw-r--r-- | fs/ceph/inode.c | 14 | ||||
-rw-r--r-- | fs/ceph/messenger.c | 31 | ||||
-rw-r--r-- | fs/ceph/osdmap.c | 180 | ||||
-rw-r--r-- | fs/ceph/osdmap.h | 1 | ||||
-rw-r--r-- | fs/ceph/rados.h | 6 | ||||
-rw-r--r-- | fs/ceph/snap.c | 50 | ||||
-rw-r--r-- | fs/ceph/super.c | 7 | ||||
-rw-r--r-- | fs/ceph/super.h | 4 |
15 files changed, 283 insertions, 172 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index aa3cd7c..4b42c2b 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -337,16 +337,15 @@ out: /* * Get ref for the oldest snapc for an inode with dirty data... that is, the * only snap context we are allowed to write back. - * - * Caller holds i_lock. */ -static struct ceph_snap_context *__get_oldest_context(struct inode *inode, - u64 *snap_size) +static struct ceph_snap_context *get_oldest_context(struct inode *inode, + u64 *snap_size) { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_snap_context *snapc = NULL; struct ceph_cap_snap *capsnap = NULL; + spin_lock(&inode->i_lock); list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap, capsnap->context, capsnap->dirty_pages); @@ -357,21 +356,11 @@ static struct ceph_snap_context *__get_oldest_context(struct inode *inode, break; } } - if (!snapc && ci->i_snap_realm) { - snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context); + if (!snapc && ci->i_head_snapc) { + snapc = ceph_get_snap_context(ci->i_head_snapc); dout(" head snapc %p has %d dirty pages\n", snapc, ci->i_wrbuffer_ref_head); } - return snapc; -} - -static struct ceph_snap_context *get_oldest_context(struct inode *inode, - u64 *snap_size) -{ - struct ceph_snap_context *snapc = NULL; - - spin_lock(&inode->i_lock); - snapc = __get_oldest_context(inode, snap_size); spin_unlock(&inode->i_lock); return snapc; } @@ -392,7 +381,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) int len = PAGE_CACHE_SIZE; loff_t i_size; int err = 0; - struct ceph_snap_context *snapc; + struct ceph_snap_context *snapc, *oldest; u64 snap_size = 0; long writeback_stat; @@ -413,13 +402,16 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) dout("writepage %p page %p not dirty?\n", inode, page); goto out; } - if (snapc != get_oldest_context(inode, &snap_size)) { + oldest = get_oldest_context(inode, &snap_size); + if (snapc->seq > oldest->seq) { dout("writepage %p page %p snapc %p not writeable - noop\n", inode, page, (void *)page->private); /* we should only noop if called by kswapd */ WARN_ON((current->flags & PF_MEMALLOC) == 0); + ceph_put_snap_context(oldest); goto out; } + ceph_put_snap_context(oldest); /* is this a partial page at end of file? */ if (snap_size) @@ -458,7 +450,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) ClearPagePrivate(page); end_page_writeback(page); ceph_put_wrbuffer_cap_refs(ci, 1, snapc); - ceph_put_snap_context(snapc); + ceph_put_snap_context(snapc); /* page's reference */ out: return err; } @@ -517,7 +509,7 @@ static void writepages_finish(struct ceph_osd_request *req, u64 bytes = 0; struct ceph_client *client = ceph_inode_to_client(inode); long writeback_stat; - unsigned issued = __ceph_caps_issued(ci, NULL); + unsigned issued = ceph_caps_issued(ci); /* parse reply */ replyhead = msg->front.iov_base; @@ -558,9 +550,9 @@ static void writepages_finish(struct ceph_osd_request *req, dout("inode %p skipping page %p\n", inode, page); wbc->pages_skipped++; } + ceph_put_snap_context((void *)page->private); page->private = 0; ClearPagePrivate(page); - ceph_put_snap_context(snapc); dout("unlocking %d %p\n", i, page); end_page_writeback(page); @@ -618,7 +610,7 @@ static int ceph_writepages_start(struct address_space *mapping, int range_whole = 0; int should_loop = 1; pgoff_t max_pages = 0, max_pages_ever = 0; - struct ceph_snap_context *snapc = NULL, *last_snapc = NULL; + struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc; struct pagevec pvec; int done = 0; int rc = 0; @@ -770,9 +762,10 @@ get_more_pages: } /* only if matching snap context */ - if (snapc != (void *)page->private) { - dout("page snapc %p != oldest %p\n", - (void *)page->private, snapc); + pgsnapc = (void *)page->private; + if (pgsnapc->seq > snapc->seq) { + dout("page snapc %p %lld > oldest %p %lld\n", + pgsnapc, pgsnapc->seq, snapc, snapc->seq); unlock_page(page); if (!locked_pages) continue; /* keep looking for snap */ @@ -914,7 +907,10 @@ static int context_is_writeable_or_written(struct inode *inode, struct ceph_snap_context *snapc) { struct ceph_snap_context *oldest = get_oldest_context(inode, NULL); - return !oldest || snapc->seq <= oldest->seq; + int ret = !oldest || snapc->seq <= oldest->seq; + + ceph_put_snap_context(oldest); + return ret; } /* @@ -936,8 +932,8 @@ static int ceph_update_writeable_page(struct file *file, int pos_in_page = pos & ~PAGE_CACHE_MASK; int end_in_page = pos_in_page + len; loff_t i_size; - struct ceph_snap_context *snapc; int r; + struct ceph_snap_context *snapc, *oldest; retry_locked: /* writepages currently holds page lock, but if we change that later, */ @@ -947,23 +943,24 @@ retry_locked: BUG_ON(!ci->i_snap_realm); down_read(&mdsc->snap_rwsem); BUG_ON(!ci->i_snap_realm->cached_context); - if (page->private && - (void *)page->private != ci->i_snap_realm->cached_context) { + snapc = (void *)page->private; + if (snapc && snapc != ci->i_head_snapc) { /* * this page is already dirty in another (older) snap * context! is it writeable now? */ - snapc = get_oldest_context(inode, NULL); + oldest = get_oldest_context(inode, NULL); up_read(&mdsc->snap_rwsem); - if (snapc != (void *)page->private) { + if (snapc->seq > oldest->seq) { + ceph_put_snap_context(oldest); dout(" page %p snapc %p not current or oldest\n", - page, (void *)page->private); + page, snapc); /* * queue for writeback, and wait for snapc to * be writeable or written */ - snapc = ceph_get_snap_context((void *)page->private); + snapc = ceph_get_snap_context(snapc); unlock_page(page); ceph_queue_writeback(inode); r = wait_event_interruptible(ci->i_cap_wq, @@ -973,6 +970,7 @@ retry_locked: return r; return -EAGAIN; } + ceph_put_snap_context(oldest); /* yay, writeable, do it now (without dropping page lock) */ dout(" page %p snapc %p not current, but oldest\n", diff --git a/fs/ceph/auth.c b/fs/ceph/auth.c index f6394b9..818afe7 100644 --- a/fs/ceph/auth.c +++ b/fs/ceph/auth.c @@ -3,6 +3,7 @@ #include <linux/module.h> #include <linux/slab.h> #include <linux/err.h> +#include <linux/slab.h> #include "types.h" #include "auth_none.h" diff --git a/fs/ceph/auth_none.h b/fs/ceph/auth_none.h index 56c0553..8164df1 100644 --- a/fs/ceph/auth_none.h +++ b/fs/ceph/auth_none.h @@ -1,6 +1,8 @@ #ifndef _FS_CEPH_AUTH_NONE_H #define _FS_CEPH_AUTH_NONE_H +#include <linux/slab.h> + #include "auth.h" /* diff --git a/fs/ceph/auth_x.c b/fs/ceph/auth_x.c index d9001a4..fee5a08 100644 --- a/fs/ceph/auth_x.c +++ b/fs/ceph/auth_x.c @@ -12,8 +12,6 @@ #include "auth.h" #include "decode.h" -struct kmem_cache *ceph_x_ticketbuf_cachep; - #define TEMP_TICKET_BUF_LEN 256 static void ceph_x_validate_tickets(struct ceph_auth_client *ac, int *pneed); @@ -131,13 +129,12 @@ static int ceph_x_proc_ticket_reply(struct ceph_auth_client *ac, char *ticket_buf; u8 struct_v; - dbuf = kmem_cache_alloc(ceph_x_ticketbuf_cachep, GFP_NOFS | GFP_ATOMIC); + dbuf = kmalloc(TEMP_TICKET_BUF_LEN, GFP_NOFS); if (!dbuf) return -ENOMEM; ret = -ENOMEM; - ticket_buf = kmem_cache_alloc(ceph_x_ticketbuf_cachep, - GFP_NOFS | GFP_ATOMIC); + ticket_buf = kmalloc(TEMP_TICKET_BUF_LEN, GFP_NOFS); if (!ticket_buf) goto out_dbuf; @@ -251,9 +248,9 @@ static int ceph_x_proc_ticket_reply(struct ceph_auth_client *ac, ret = 0; out: - kmem_cache_free(ceph_x_ticketbuf_cachep, ticket_buf); + kfree(ticket_buf); out_dbuf: - kmem_cache_free(ceph_x_ticketbuf_cachep, dbuf); + kfree(dbuf); return ret; bad: @@ -605,8 +602,6 @@ static void ceph_x_destroy(struct ceph_auth_client *ac) remove_ticket_handler(ac, th); } - kmem_cache_destroy(ceph_x_ticketbuf_cachep); - kfree(ac->private); ac->private = NULL; } @@ -641,26 +636,20 @@ int ceph_x_init(struct ceph_auth_client *ac) int ret; dout("ceph_x_init %p\n", ac); + ret = -ENOMEM; xi = kzalloc(sizeof(*xi), GFP_NOFS); if (!xi) - return -ENOMEM; + goto out; - ret = -ENOMEM; - ceph_x_ticketbuf_cachep = kmem_cache_create("ceph_x_ticketbuf", - TEMP_TICKET_BUF_LEN, 8, - (SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD), - NULL); - if (!ceph_x_ticketbuf_cachep) - goto done_nomem; ret = -EINVAL; if (!ac->secret) { pr_err("no secret set (for auth_x protocol)\n"); - goto done_nomem; + goto out_nomem; } ret = ceph_crypto_key_unarmor(&xi->secret, ac->secret); if (ret) - goto done_nomem; + goto out_nomem; xi->starting = true; xi->ticket_handlers = RB_ROOT; @@ -670,10 +659,9 @@ int ceph_x_init(struct ceph_auth_client *ac) ac->ops = &ceph_x_ops; return 0; -done_nomem: +out_nomem: kfree(xi); - if (ceph_x_ticketbuf_cachep) - kmem_cache_destroy(ceph_x_ticketbuf_cachep); +out: return ret; } diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 3710e07..0c16818 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1205,6 +1205,12 @@ retry: if (capsnap->dirty_pages || capsnap->writing) continue; + /* + * if cap writeback already occurred, we should have dropped + * the capsnap in ceph_put_wrbuffer_cap_refs. + */ + BUG_ON(capsnap->dirty == 0); + /* pick mds, take s_mutex */ mds = __ceph_get_cap_mds(ci, &mseq); if (session && session->s_mds != mds) { @@ -1855,8 +1861,8 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc, } else { pr_err("%p auth cap %p not mds%d ???\n", inode, cap, session->s_mds); - spin_unlock(&inode->i_lock); } + spin_unlock(&inode->i_lock); } } @@ -2118,8 +2124,8 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) } spin_unlock(&inode->i_lock); - dout("put_cap_refs %p had %s %s\n", inode, ceph_cap_string(had), - last ? "last" : ""); + dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had), + last ? " last" : "", put ? " put" : ""); if (last && !flushsnaps) ceph_check_caps(ci, 0, NULL); @@ -2143,7 +2149,8 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, { struct inode *inode = &ci->vfs_inode; int last = 0; - int last_snap = 0; + int complete_capsnap = 0; + int drop_capsnap = 0; int found = 0; struct ceph_cap_snap *capsnap = NULL; @@ -2166,19 +2173,32 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { if (capsnap->context == snapc) { found = 1; - capsnap->dirty_pages -= nr; - last_snap = !capsnap->dirty_pages; break; } } BUG_ON(!found); + capsnap->dirty_pages -= nr; + if (capsnap->dirty_pages == 0) { + complete_capsnap = 1; + if (capsnap->dirty == 0) + /* cap writeback completed before we created + * the cap_snap; no FLUSHSNAP is needed */ + drop_capsnap = 1; + } dout("put_wrbuffer_cap_refs on %p cap_snap %p " - " snap %lld %d/%d -> %d/%d %s%s\n", + " snap %lld %d/%d -> %d/%d %s%s%s\n", inode, capsnap, capsnap->context->seq, ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr, ci->i_wrbuffer_ref, capsnap->dirty_pages, last ? " (wrbuffer last)" : "", - last_snap ? " (capsnap last)" : ""); + complete_capsnap ? " (complete capsnap)" : "", + drop_capsnap ? " (drop capsnap)" : ""); + if (drop_capsnap) { + ceph_put_snap_context(capsnap->context); + list_del(&capsnap->ci_item); + list_del(&capsnap->flushing_item); + ceph_put_cap_snap(capsnap); + } } spin_unlock(&inode->i_lock); @@ -2186,10 +2206,12 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, if (last) { ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); iput(inode); - } else if (last_snap) { + } else if (complete_capsnap) { ceph_flush_snaps(ci); wake_up(&ci->i_cap_wq); } + if (drop_capsnap) + iput(inode); } /* @@ -2465,8 +2487,8 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, break; } WARN_ON(capsnap->dirty_pages || capsnap->writing); - dout(" removing cap_snap %p follows %lld\n", - capsnap, follows); + dout(" removing %p cap_snap %p follows %lld\n", + inode, capsnap, follows); ceph_put_snap_context(capsnap->context); list_del(&capsnap->ci_item); list_del(&capsnap->flushing_item); diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 7261dc6..650d2db 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -171,11 +171,11 @@ more: spin_lock(&inode->i_lock); spin_lock(&dcache_lock); + last = dentry; + if (err < 0) goto out_unlock; - last = dentry; - p = p->prev; filp->f_pos++; @@ -312,7 +312,7 @@ more: req->r_readdir_offset = fi->next_offset; req->r_args.readdir.frag = cpu_to_le32(frag); req->r_args.readdir.max_entries = cpu_to_le32(max_entries); - req->r_num_caps = max_entries; + req->r_num_caps = max_entries + 1; err = ceph_mdsc_do_request(mdsc, NULL, req); if (err < 0) { ceph_mdsc_put_request(req); @@ -489,6 +489,7 @@ struct dentry *ceph_finish_lookup(struct ceph_mds_request *req, struct inode *inode = ceph_get_snapdir(parent); dout("ENOENT on snapdir %p '%.*s', linking to snapdir %p\n", dentry, dentry->d_name.len, dentry->d_name.name, inode); + BUG_ON(!d_unhashed(dentry)); d_add(dentry, inode); err = 0; } @@ -879,7 +880,16 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, * do_request, above). If there is no trace, we need * to do it here. */ + + /* d_move screws up d_subdirs order */ + ceph_i_clear(new_dir, CEPH_I_COMPLETE); + d_move(old_dentry, new_dentry); + + /* ensure target dentry is invalidated, despite + rehashing bug in vfs_rename_dir */ + new_dentry->d_time = jiffies; + ceph_dentry(new_dentry)->lease_shared_gen = 0; } ceph_mdsc_put_request(req); return err; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 4add3d5..ed6f197 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -665,7 +665,8 @@ more: * throw out any page cache pages in this range. this * may block. */ - truncate_inode_pages_range(inode->i_mapping, pos, pos+len); + truncate_inode_pages_range(inode->i_mapping, pos, + (pos+len) | (PAGE_CACHE_SIZE-1)); } else { pages = alloc_page_vector(num_pages); if (IS_ERR(pages)) { diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index aca82d5..261f3e6 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -886,6 +886,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, struct inode *in = NULL; struct ceph_mds_reply_inode *ininfo; struct ceph_vino vino; + struct ceph_client *client = ceph_sb_to_client(sb); int i = 0; int err = 0; @@ -949,7 +950,14 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, return err; } - if (rinfo->head->is_dentry && !req->r_aborted) { + /* + * ignore null lease/binding on snapdir ENOENT, or else we + * will have trouble splicing in the virtual snapdir later + */ + if (rinfo->head->is_dentry && !req->r_aborted && + (rinfo->head->is_target || strncmp(req->r_dentry->d_name.name, + client->mount_args->snapdir_name, + req->r_dentry->d_name.len))) { /* * lookup link rename : null -> possibly existing inode * mknod symlink mkdir : null -> new inode @@ -989,6 +997,10 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, dn, dn->d_name.len, dn->d_name.name); dout("fill_trace doing d_move %p -> %p\n", req->r_old_dentry, dn); + + /* d_move screws up d_subdirs order */ + ceph_i_clear(dir, CEPH_I_COMPLETE); + d_move(req->r_old_dentry, dn); dout(" src %p '%.*s' dst %p '%.*s'\n", req->r_old_dentry, diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c index 8f1715f..509f57d 100644 --- a/fs/ceph/messenger.c +++ b/fs/ceph/messenger.c @@ -30,6 +30,10 @@ static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_ack = CEPH_MSGR_TAG_ACK; static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; +#ifdef CONFIG_LOCKDEP +static struct lock_class_key socket_class; +#endif + static void queue_con(struct ceph_connection *con); static void con_work(struct work_struct *); @@ -228,6 +232,10 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con) con->sock = sock; sock->sk->sk_allocation = GFP_NOFS; +#ifdef CONFIG_LOCKDEP + lockdep_set_class(&sock->sk->sk_lock, &socket_class); +#endif + set_sock_callbacks(sock, con); dout("connect %s\n", pr_addr(&con->peer_addr.in_addr)); @@ -333,6 +341,7 @@ static void reset_connection(struct ceph_connection *con) con->out_msg = NULL; } con->in_seq = 0; + con->in_seq_acked = 0; } /* @@ -1325,6 +1334,7 @@ static int read_partial_message(struct ceph_connection *con) unsigned front_len, middle_len, data_len, data_off; int datacrc = con->msgr->nocrc; int skip; + u64 seq; dout("read_partial_message con %p msg %p\n", con, m); @@ -1359,6 +1369,25 @@ static int read_partial_message(struct ceph_connection *con) return -EIO; data_off = le16_to_cpu(con->in_hdr.data_off); + /* verify seq# */ + seq = le64_to_cpu(con->in_hdr.seq); + if ((s64)seq - (s64)con->in_seq < 1) { + pr_info("skipping %s%lld %s seq %lld, expected %lld\n", + ENTITY_NAME(con->peer_name), + pr_addr(&con->peer_addr.in_addr), + seq, con->in_seq + 1); + con->in_base_pos = -front_len - middle_len - data_len - + sizeof(m->footer); + con->in_tag = CEPH_MSGR_TAG_READY; + con->in_seq++; + return 0; + } else if ((s64)seq - (s64)con->in_seq > 1) { + pr_err("read_partial_message bad seq %lld expected %lld\n", + seq, con->in_seq + 1); + con->error_msg = "bad message sequence # for incoming message"; + return -EBADMSG; + } + /* allocate message? */ if (!con->in_msg) { dout("got hdr type %d front %d data %d\n", con->in_hdr.type, @@ -1370,6 +1399,7 @@ static int read_partial_message(struct ceph_connection *con) con->in_base_pos = -front_len - middle_len - data_len - sizeof(m->footer); con->in_tag = CEPH_MSGR_TAG_READY; + con->in_seq++; return 0; } if (IS_ERR(con->in_msg)) { @@ -2021,6 +2051,7 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) ceph_msg_put(con->in_msg); con->in_msg = NULL; con->in_tag = CEPH_MSGR_TAG_READY; + con->in_seq++; } else { dout("con_revoke_pages %p msg %p pages %p no-op\n", con, con->in_msg, msg); diff --git a/fs/ceph/osdmap.c b/fs/ceph/osdmap.c index 21c6623..2e2c15e 100644 --- a/fs/ceph/osdmap.c +++ b/fs/ceph/osdmap.c @@ -314,71 +314,6 @@ bad: return ERR_PTR(err); } - -/* - * osd map - */ -void ceph_osdmap_destroy(struct ceph_osdmap *map) -{ - dout("osdmap_destroy %p\n", map); - if (map->crush) - crush_destroy(map->crush); - while (!RB_EMPTY_ROOT(&map->pg_temp)) { - struct ceph_pg_mapping *pg = - rb_entry(rb_first(&map->pg_temp), - struct ceph_pg_mapping, node); - rb_erase(&pg->node, &map->pg_temp); - kfree(pg); - } - while (!RB_EMPTY_ROOT(&map->pg_pools)) { - struct ceph_pg_pool_info *pi = - rb_entry(rb_first(&map->pg_pools), - struct ceph_pg_pool_info, node); - rb_erase(&pi->node, &map->pg_pools); - kfree(pi); - } - kfree(map->osd_state); - kfree(map->osd_weight); - kfree(map->osd_addr); - kfree(map); -} - -/* - * adjust max osd value. reallocate arrays. - */ -static int osdmap_set_max_osd(struct ceph_osdmap *map, int max) -{ - u8 *state; - struct ceph_entity_addr *addr; - u32 *weight; - - state = kcalloc(max, sizeof(*state), GFP_NOFS); - addr = kcalloc(max, sizeof(*addr), GFP_NOFS); - weight = kcalloc(max, sizeof(*weight), GFP_NOFS); - if (state == NULL || addr == NULL || weight == NULL) { - kfree(state); - kfree(addr); - kfree(weight); - return -ENOMEM; - } - - /* copy old? */ - if (map->osd_state) { - memcpy(state, map->osd_state, map->max_osd*sizeof(*state)); - memcpy(addr, map->osd_addr, map->max_osd*sizeof(*addr)); - memcpy(weight, map->osd_weight, map->max_osd*sizeof(*weight)); - kfree(map->osd_state); - kfree(map->osd_addr); - kfree(map->osd_weight); - } - - map->osd_state = state; - map->osd_weight = weight; - map->osd_addr = addr; - map->max_osd = max; - return 0; -} - /* * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid * to a set of osds) @@ -482,6 +417,13 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id) return NULL; } +static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi) +{ + rb_erase(&pi->node, root); + kfree(pi->name); + kfree(pi); +} + void __decode_pool(void **p, struct ceph_pg_pool_info *pi) { ceph_decode_copy(p, &pi->v, sizeof(pi->v)); @@ -490,6 +432,98 @@ void __decode_pool(void **p, struct ceph_pg_pool_info *pi) *p += le32_to_cpu(pi->v.num_removed_snap_intervals) * sizeof(u64) * 2; } +static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map) +{ + struct ceph_pg_pool_info *pi; + u32 num, len, pool; + + ceph_decode_32_safe(p, end, num, bad); + dout(" %d pool names\n", num); + while (num--) { + ceph_decode_32_safe(p, end, pool, bad); + ceph_decode_32_safe(p, end, len, bad); + dout(" pool %d len %d\n", pool, len); + pi = __lookup_pg_pool(&map->pg_pools, pool); + if (pi) { + kfree(pi->name); + pi->name = kmalloc(len + 1, GFP_NOFS); + if (pi->name) { + memcpy(pi->name, *p, len); + pi->name[len] = '\0'; + dout(" name is %s\n", pi->name); + } + } + *p += len; + } + return 0; + +bad: + return -EINVAL; +} + +/* + * osd map + */ +void ceph_osdmap_destroy(struct ceph_osdmap *map) +{ + dout("osdmap_destroy %p\n", map); + if (map->crush) + crush_destroy(map->crush); + while (!RB_EMPTY_ROOT(&map->pg_temp)) { + struct ceph_pg_mapping *pg = + rb_entry(rb_first(&map->pg_temp), + struct ceph_pg_mapping, node); + rb_erase(&pg->node, &map->pg_temp); + kfree(pg); + } + while (!RB_EMPTY_ROOT(&map->pg_pools)) { + struct ceph_pg_pool_info *pi = + rb_entry(rb_first(&map->pg_pools), + struct ceph_pg_pool_info, node); + __remove_pg_pool(&map->pg_pools, pi); + } + kfree(map->osd_state); + kfree(map->osd_weight); + kfree(map->osd_addr); + kfree(map); +} + +/* + * adjust max osd value. reallocate arrays. + */ +static int osdmap_set_max_osd(struct ceph_osdmap *map, int max) +{ + u8 *state; + struct ceph_entity_addr *addr; + u32 *weight; + + state = kcalloc(max, sizeof(*state), GFP_NOFS); + addr = kcalloc(max, sizeof(*addr), GFP_NOFS); + weight = kcalloc(max, sizeof(*weight), GFP_NOFS); + if (state == NULL || addr == NULL || weight == NULL) { + kfree(state); + kfree(addr); + kfree(weight); + return -ENOMEM; + } + + /* copy old? */ + if (map->osd_state) { + memcpy(state, map->osd_state, map->max_osd*sizeof(*state)); + memcpy(addr, map->osd_addr, map->max_osd*sizeof(*addr)); + memcpy(weight, map->osd_weight, map->max_osd*sizeof(*weight)); + kfree(map->osd_state); + kfree(map->osd_addr); + kfree(map->osd_weight); + } + + map->osd_state = state; + map->osd_weight = weight; + map->osd_addr = addr; + map->max_osd = max; + return 0; +} + /* * decode a full map. */ @@ -526,7 +560,7 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) ceph_decode_32_safe(p, end, max, bad); while (max--) { ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad); - pi = kmalloc(sizeof(*pi), GFP_NOFS); + pi = kzalloc(sizeof(*pi), GFP_NOFS); if (!pi) goto bad; pi->id = ceph_decode_32(p); @@ -539,6 +573,10 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) __decode_pool(p, pi); __insert_pg_pool(&map->pg_pools, pi); } + + if (version >= 5 && __decode_pool_names(p, end, map) < 0) + goto bad; + ceph_decode_32_safe(p, end, map->pool_max, bad); ceph_decode_32_safe(p, end, map->flags, bad); @@ -712,7 +750,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, } pi = __lookup_pg_pool(&map->pg_pools, pool); if (!pi) { - pi = kmalloc(sizeof(*pi), GFP_NOFS); + pi = kzalloc(sizeof(*pi), GFP_NOFS); if (!pi) { err = -ENOMEM; goto bad; @@ -722,6 +760,8 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, } __decode_pool(p, pi); } + if (version >= 5 && __decode_pool_names(p, end, map) < 0) + goto bad; /* old_pool */ ceph_decode_32_safe(p, end, len, bad); @@ -730,10 +770,8 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, ceph_decode_32_safe(p, end, pool, bad); pi = __lookup_pg_pool(&map->pg_pools, pool); - if (pi) { - rb_erase(&pi->node, &map->pg_pools); - kfree(pi); - } + if (pi) + __remove_pg_pool(&map->pg_pools, pi); } /* new_up */ diff --git a/fs/ceph/osdmap.h b/fs/ceph/osdmap.h index 1fb55af..8bc9f1e 100644 --- a/fs/ceph/osdmap.h +++ b/fs/ceph/osdmap.h @@ -23,6 +23,7 @@ struct ceph_pg_pool_info { int id; struct ceph_pg_pool v; int pg_num_mask, pgp_num_mask, lpg_num_mask, lpgp_num_mask; + char *name; }; struct ceph_pg_mapping { diff --git a/fs/ceph/rados.h b/fs/ceph/rados.h index 26ac8b8..a1fc1d0 100644 --- a/fs/ceph/rados.h +++ b/fs/ceph/rados.h @@ -11,8 +11,10 @@ /* * osdmap encoding versions */ -#define CEPH_OSDMAP_INC_VERSION 4 -#define CEPH_OSDMAP_VERSION 4 +#define CEPH_OSDMAP_INC_VERSION 5 +#define CEPH_OSDMAP_INC_VERSION_EXT 5 +#define CEPH_OSDMAP_VERSION 5 +#define CEPH_OSDMAP_VERSION_EXT 5 /* * fs id diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index e6f9bc5..d5114db 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -431,8 +431,7 @@ static int dup_array(u64 **dst, __le64 *src, int num) * Caller must hold snap_rwsem for read (i.e., the realm topology won't * change). */ -void ceph_queue_cap_snap(struct ceph_inode_info *ci, - struct ceph_snap_context *snapc) +void ceph_queue_cap_snap(struct ceph_inode_info *ci) { struct inode *inode = &ci->vfs_inode; struct ceph_cap_snap *capsnap; @@ -451,10 +450,11 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci, as no new writes are allowed to start when pending, so any writes in progress now were started before the previous cap_snap. lucky us. */ - dout("queue_cap_snap %p snapc %p seq %llu used %d" - " already pending\n", inode, snapc, snapc->seq, used); + dout("queue_cap_snap %p already pending\n", inode); kfree(capsnap); } else if (ci->i_wrbuffer_ref_head || (used & CEPH_CAP_FILE_WR)) { + struct ceph_snap_context *snapc = ci->i_head_snapc; + igrab(inode); atomic_set(&capsnap->nref, 1); @@ -463,7 +463,6 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci, INIT_LIST_HEAD(&capsnap->flushing_item); capsnap->follows = snapc->seq - 1; - capsnap->context = ceph_get_snap_context(snapc); capsnap->issued = __ceph_caps_issued(ci, NULL); capsnap->dirty = __ceph_caps_dirty(ci); @@ -480,7 +479,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci, snapshot. */ capsnap->dirty_pages = ci->i_wrbuffer_ref_head; ci->i_wrbuffer_ref_head = 0; - ceph_put_snap_context(ci->i_head_snapc); + capsnap->context = snapc; ci->i_head_snapc = NULL; list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps); @@ -522,15 +521,17 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci, capsnap->ctime = inode->i_ctime; capsnap->time_warp_seq = ci->i_time_warp_seq; if (capsnap->dirty_pages) { - dout("finish_cap_snap %p cap_snap %p snapc %p %llu s=%llu " + dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu " "still has %d dirty pages\n", inode, capsnap, capsnap->context, capsnap->context->seq, - capsnap->size, capsnap->dirty_pages); + ceph_cap_string(capsnap->dirty), capsnap->size, + capsnap->dirty_pages); return 0; } - dout("finish_cap_snap %p cap_snap %p snapc %p %llu s=%llu clean\n", + dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu\n", inode, capsnap, capsnap->context, - capsnap->context->seq, capsnap->size); + capsnap->context->seq, ceph_cap_string(capsnap->dirty), + capsnap->size); spin_lock(&mdsc->snap_flush_lock); list_add_tail(&ci->i_snap_flush_item, &mdsc->snap_flush_list); @@ -602,7 +603,7 @@ more: if (lastinode) iput(lastinode); lastinode = inode; - ceph_queue_cap_snap(ci, realm->cached_context); + ceph_queue_cap_snap(ci); spin_lock(&realm->inodes_with_caps_lock); } spin_unlock(&realm->inodes_with_caps_lock); @@ -824,8 +825,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, spin_unlock(&realm->inodes_with_caps_lock); spin_unlock(&inode->i_lock); - ceph_queue_cap_snap(ci, - ci->i_snap_realm->cached_context); + ceph_queue_cap_snap(ci); iput(inode); continue; @@ -869,16 +869,20 @@ skip_inode: continue; ci = ceph_inode(inode); spin_lock(&inode->i_lock); - if (!ci->i_snap_realm) - goto split_skip_inode; - ceph_put_snap_realm(mdsc, ci->i_snap_realm); - spin_lock(&realm->inodes_with_caps_lock); - list_add(&ci->i_snap_realm_item, - &realm->inodes_with_caps); - ci->i_snap_realm = realm; - spin_unlock(&realm->inodes_with_caps_lock); - ceph_get_snap_realm(mdsc, realm); -split_skip_inode: + if (list_empty(&ci->i_snap_realm_item)) { + struct ceph_snap_realm *oldrealm = + ci->i_snap_realm; + + dout(" moving %p to split realm %llx %p\n", + inode, realm->ino, realm); + spin_lock(&realm->inodes_with_caps_lock); + list_add(&ci->i_snap_realm_item, + &realm->inodes_with_caps); + ci->i_snap_realm = realm; + spin_unlock(&realm->inodes_with_caps_lock); + ceph_get_snap_realm(mdsc, realm); + ceph_put_snap_realm(mdsc, oldrealm); + } spin_unlock(&inode->i_lock); iput(inode); } diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 75d02ea..f888cf4 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -996,9 +996,10 @@ static int __init init_ceph(void) if (ret) goto out_icache; - pr_info("loaded %d.%d.%d (mon/mds/osd proto %d/%d/%d)\n", - CEPH_VERSION_MAJOR, CEPH_VERSION_MINOR, CEPH_VERSION_PATCH, - CEPH_MONC_PROTOCOL, CEPH_MDSC_PROTOCOL, CEPH_OSDC_PROTOCOL); + pr_info("loaded (mon/mds/osd proto %d/%d/%d, osdmap %d/%d %d/%d)\n", + CEPH_MONC_PROTOCOL, CEPH_MDSC_PROTOCOL, CEPH_OSDC_PROTOCOL, + CEPH_OSDMAP_VERSION, CEPH_OSDMAP_VERSION_EXT, + CEPH_OSDMAP_INC_VERSION, CEPH_OSDMAP_INC_VERSION_EXT); return 0; out_icache: diff --git a/fs/ceph/super.h b/fs/ceph/super.h index ca702c6..13513b8 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -10,6 +10,7 @@ #include <linux/fs.h> #include <linux/mempool.h> #include <linux/pagemap.h> +#include <linux/slab.h> #include <linux/wait.h> #include <linux/writeback.h> #include <linux/slab.h> @@ -715,8 +716,7 @@ extern int ceph_update_snap_trace(struct ceph_mds_client *m, extern void ceph_handle_snap(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, struct ceph_msg *msg); -extern void ceph_queue_cap_snap(struct ceph_inode_info *ci, - struct ceph_snap_context *snapc); +extern void ceph_queue_cap_snap(struct ceph_inode_info *ci); extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci, struct ceph_cap_snap *capsnap); extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc); |