@@ -1571,18 +1571,16 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
15711571 struct inode * inode = file_inode (file );
15721572 struct ceph_inode_info * ci = ceph_inode (inode );
15731573 struct ceph_fs_client * fsc = ceph_inode_to_client (inode );
1574- struct ceph_vino vino ;
1574+ struct ceph_osd_client * osdc = & fsc -> client -> osdc ;
15751575 struct ceph_osd_request * req ;
15761576 struct page * * pages ;
15771577 u64 len ;
15781578 int num_pages ;
15791579 int written = 0 ;
1580- int flags ;
15811580 int ret ;
15821581 bool check_caps = false;
15831582 struct timespec64 mtime = current_time (inode );
15841583 size_t count = iov_iter_count (from );
1585- size_t off ;
15861584
15871585 if (ceph_snap (file_inode (file )) != CEPH_NOSNAP )
15881586 return - EROFS ;
@@ -1602,72 +1600,335 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
16021600 if (ret < 0 )
16031601 dout ("invalidate_inode_pages2_range returned %d\n" , ret );
16041602
1605- flags = /* CEPH_OSD_FLAG_ORDERSNAP | */ CEPH_OSD_FLAG_WRITE ;
1606-
16071603 while ((len = iov_iter_count (from )) > 0 ) {
16081604 size_t left ;
16091605 int n ;
1606+ u64 write_pos = pos ;
1607+ u64 write_len = len ;
1608+ u64 objnum , objoff ;
1609+ u32 xlen ;
1610+ u64 assert_ver = 0 ;
1611+ bool rmw ;
1612+ bool first , last ;
1613+ struct iov_iter saved_iter = * from ;
1614+ size_t off ;
1615+
1616+ ceph_fscrypt_adjust_off_and_len (inode , & write_pos , & write_len );
1617+
1618+ /* clamp the length to the end of first object */
1619+ ceph_calc_file_object_mapping (& ci -> i_layout , write_pos ,
1620+ write_len , & objnum , & objoff ,
1621+ & xlen );
1622+ write_len = xlen ;
1623+
1624+ /* adjust len downward if it goes beyond current object */
1625+ if (pos + len > write_pos + write_len )
1626+ len = write_pos + write_len - pos ;
16101627
1611- vino = ceph_vino (inode );
1612- req = ceph_osdc_new_request (& fsc -> client -> osdc , & ci -> i_layout ,
1613- vino , pos , & len , 0 , 1 ,
1614- CEPH_OSD_OP_WRITE , flags , snapc ,
1615- ci -> i_truncate_seq ,
1616- ci -> i_truncate_size ,
1617- false);
1618- if (IS_ERR (req )) {
1619- ret = PTR_ERR (req );
1620- break ;
1621- }
1628+ /*
1629+ * If we had to adjust the length or position to align with a
1630+ * crypto block, then we must do a read/modify/write cycle. We
1631+ * use a version assertion to redrive the thing if something
1632+ * changes in between.
1633+ */
1634+ first = pos != write_pos ;
1635+ last = (pos + len ) != (write_pos + write_len );
1636+ rmw = first || last ;
16221637
1623- num_pages = calc_pages_for (pos , len );
1638+ dout ("sync_write ino %llx %lld~%llu adjusted %lld~%llu -- %srmw\n" ,
1639+ ci -> i_vino .ino , pos , len , write_pos , write_len ,
1640+ rmw ? "" : "no " );
1641+
1642+ /*
1643+ * The data is emplaced into the page as it would be if it were
1644+ * in an array of pagecache pages.
1645+ */
1646+ num_pages = calc_pages_for (write_pos , write_len );
16241647 pages = ceph_alloc_page_vector (num_pages , GFP_KERNEL );
16251648 if (IS_ERR (pages )) {
16261649 ret = PTR_ERR (pages );
1627- goto out ;
1650+ break ;
1651+ }
1652+
1653+ /* Do we need to preload the pages? */
1654+ if (rmw ) {
1655+ u64 first_pos = write_pos ;
1656+ u64 last_pos = (write_pos + write_len ) - CEPH_FSCRYPT_BLOCK_SIZE ;
1657+ u64 read_len = CEPH_FSCRYPT_BLOCK_SIZE ;
1658+ struct ceph_osd_req_op * op ;
1659+
1660+ /* We should only need to do this for encrypted inodes */
1661+ WARN_ON_ONCE (!IS_ENCRYPTED (inode ));
1662+
1663+ /* No need to do two reads if first and last blocks are same */
1664+ if (first && last_pos == first_pos )
1665+ last = false;
1666+
1667+ /*
1668+ * Allocate a read request for one or two extents,
1669+ * depending on how the request was aligned.
1670+ */
1671+ req = ceph_osdc_new_request (osdc , & ci -> i_layout ,
1672+ ci -> i_vino , first ? first_pos : last_pos ,
1673+ & read_len , 0 , (first && last ) ? 2 : 1 ,
1674+ CEPH_OSD_OP_SPARSE_READ , CEPH_OSD_FLAG_READ ,
1675+ NULL , ci -> i_truncate_seq ,
1676+ ci -> i_truncate_size , false);
1677+ if (IS_ERR (req )) {
1678+ ceph_release_page_vector (pages , num_pages );
1679+ ret = PTR_ERR (req );
1680+ break ;
1681+ }
1682+
1683+ /* Something is misaligned! */
1684+ if (read_len != CEPH_FSCRYPT_BLOCK_SIZE ) {
1685+ ceph_osdc_put_request (req );
1686+ ceph_release_page_vector (pages , num_pages );
1687+ ret = - EIO ;
1688+ break ;
1689+ }
1690+
1691+ /* Add extent for first block? */
1692+ op = & req -> r_ops [0 ];
1693+
1694+ if (first ) {
1695+ osd_req_op_extent_osd_data_pages (req , 0 , pages ,
1696+ CEPH_FSCRYPT_BLOCK_SIZE ,
1697+ offset_in_page (first_pos ),
1698+ false, false);
1699+ /* We only expect a single extent here */
1700+ ret = __ceph_alloc_sparse_ext_map (op , 1 );
1701+ if (ret ) {
1702+ ceph_osdc_put_request (req );
1703+ ceph_release_page_vector (pages , num_pages );
1704+ break ;
1705+ }
1706+ }
1707+
1708+ /* Add extent for last block */
1709+ if (last ) {
1710+ /* Init the other extent if first extent has been used */
1711+ if (first ) {
1712+ op = & req -> r_ops [1 ];
1713+ osd_req_op_extent_init (req , 1 ,
1714+ CEPH_OSD_OP_SPARSE_READ ,
1715+ last_pos , CEPH_FSCRYPT_BLOCK_SIZE ,
1716+ ci -> i_truncate_size ,
1717+ ci -> i_truncate_seq );
1718+ }
1719+
1720+ ret = __ceph_alloc_sparse_ext_map (op , 1 );
1721+ if (ret ) {
1722+ ceph_osdc_put_request (req );
1723+ ceph_release_page_vector (pages , num_pages );
1724+ break ;
1725+ }
1726+
1727+ osd_req_op_extent_osd_data_pages (req , first ? 1 : 0 ,
1728+ & pages [num_pages - 1 ],
1729+ CEPH_FSCRYPT_BLOCK_SIZE ,
1730+ offset_in_page (last_pos ),
1731+ false, false);
1732+ }
1733+
1734+ ceph_osdc_start_request (osdc , req );
1735+ ret = ceph_osdc_wait_request (osdc , req );
1736+
1737+ /* FIXME: length field is wrong if there are 2 extents */
1738+ ceph_update_read_metrics (& fsc -> mdsc -> metric ,
1739+ req -> r_start_latency ,
1740+ req -> r_end_latency ,
1741+ read_len , ret );
1742+
1743+ /* Ok if object is not already present */
1744+ if (ret == - ENOENT ) {
1745+ /*
1746+ * If there is no object, then we can't assert
1747+ * on its version. Set it to 0, and we'll use an
1748+ * exclusive create instead.
1749+ */
1750+ ceph_osdc_put_request (req );
1751+ ret = 0 ;
1752+
1753+ /*
1754+ * zero out the soon-to-be uncopied parts of the
1755+ * first and last pages.
1756+ */
1757+ if (first )
1758+ zero_user_segment (pages [0 ], 0 ,
1759+ offset_in_page (first_pos ));
1760+ if (last )
1761+ zero_user_segment (pages [num_pages - 1 ],
1762+ offset_in_page (last_pos ),
1763+ PAGE_SIZE );
1764+ } else {
1765+ if (ret < 0 ) {
1766+ ceph_osdc_put_request (req );
1767+ ceph_release_page_vector (pages , num_pages );
1768+ break ;
1769+ }
1770+
1771+ op = & req -> r_ops [0 ];
1772+ if (op -> extent .sparse_ext_cnt == 0 ) {
1773+ if (first )
1774+ zero_user_segment (pages [0 ], 0 ,
1775+ offset_in_page (first_pos ));
1776+ else
1777+ zero_user_segment (pages [num_pages - 1 ],
1778+ offset_in_page (last_pos ),
1779+ PAGE_SIZE );
1780+ } else if (op -> extent .sparse_ext_cnt != 1 ||
1781+ ceph_sparse_ext_map_end (op ) !=
1782+ CEPH_FSCRYPT_BLOCK_SIZE ) {
1783+ ret = - EIO ;
1784+ ceph_osdc_put_request (req );
1785+ ceph_release_page_vector (pages , num_pages );
1786+ break ;
1787+ }
1788+
1789+ if (first && last ) {
1790+ op = & req -> r_ops [1 ];
1791+ if (op -> extent .sparse_ext_cnt == 0 ) {
1792+ zero_user_segment (pages [num_pages - 1 ],
1793+ offset_in_page (last_pos ),
1794+ PAGE_SIZE );
1795+ } else if (op -> extent .sparse_ext_cnt != 1 ||
1796+ ceph_sparse_ext_map_end (op ) !=
1797+ CEPH_FSCRYPT_BLOCK_SIZE ) {
1798+ ret = - EIO ;
1799+ ceph_osdc_put_request (req );
1800+ ceph_release_page_vector (pages , num_pages );
1801+ break ;
1802+ }
1803+ }
1804+
1805+ /* Grab assert version. It must be non-zero. */
1806+ assert_ver = req -> r_version ;
1807+ WARN_ON_ONCE (ret > 0 && assert_ver == 0 );
1808+
1809+ ceph_osdc_put_request (req );
1810+ if (first ) {
1811+ ret = ceph_fscrypt_decrypt_block_inplace (inode ,
1812+ pages [0 ], CEPH_FSCRYPT_BLOCK_SIZE ,
1813+ offset_in_page (first_pos ),
1814+ first_pos >> CEPH_FSCRYPT_BLOCK_SHIFT );
1815+ if (ret < 0 ) {
1816+ ceph_release_page_vector (pages , num_pages );
1817+ break ;
1818+ }
1819+ }
1820+ if (last ) {
1821+ ret = ceph_fscrypt_decrypt_block_inplace (inode ,
1822+ pages [num_pages - 1 ],
1823+ CEPH_FSCRYPT_BLOCK_SIZE ,
1824+ offset_in_page (last_pos ),
1825+ last_pos >> CEPH_FSCRYPT_BLOCK_SHIFT );
1826+ if (ret < 0 ) {
1827+ ceph_release_page_vector (pages , num_pages );
1828+ break ;
1829+ }
1830+ }
1831+ }
16281832 }
16291833
16301834 left = len ;
16311835 off = offset_in_page (pos );
16321836 for (n = 0 ; n < num_pages ; n ++ ) {
16331837 size_t plen = min_t (size_t , left , PAGE_SIZE - off );
16341838
1839+ /* copy the data */
16351840 ret = copy_page_from_iter (pages [n ], off , plen , from );
1636- off = 0 ;
16371841 if (ret != plen ) {
16381842 ret = - EFAULT ;
16391843 break ;
16401844 }
1845+ off = 0 ;
16411846 left -= ret ;
16421847 }
1643-
16441848 if (ret < 0 ) {
1849+ dout ("sync_write write failed with %d\n" , ret );
16451850 ceph_release_page_vector (pages , num_pages );
1646- goto out ;
1851+ break ;
16471852 }
16481853
1649- req -> r_inode = inode ;
1854+ if (IS_ENCRYPTED (inode )) {
1855+ ret = ceph_fscrypt_encrypt_pages (inode , pages ,
1856+ write_pos , write_len ,
1857+ GFP_KERNEL );
1858+ if (ret < 0 ) {
1859+ dout ("encryption failed with %d\n" , ret );
1860+ ceph_release_page_vector (pages , num_pages );
1861+ break ;
1862+ }
1863+ }
16501864
1651- osd_req_op_extent_osd_data_pages (req , 0 , pages , len ,
1652- offset_in_page (pos ),
1653- false, true);
1865+ req = ceph_osdc_new_request (osdc , & ci -> i_layout ,
1866+ ci -> i_vino , write_pos , & write_len ,
1867+ rmw ? 1 : 0 , rmw ? 2 : 1 ,
1868+ CEPH_OSD_OP_WRITE ,
1869+ CEPH_OSD_FLAG_WRITE ,
1870+ snapc , ci -> i_truncate_seq ,
1871+ ci -> i_truncate_size , false);
1872+ if (IS_ERR (req )) {
1873+ ret = PTR_ERR (req );
1874+ ceph_release_page_vector (pages , num_pages );
1875+ break ;
1876+ }
16541877
1878+ dout ("sync_write write op %lld~%llu\n" , write_pos , write_len );
1879+ osd_req_op_extent_osd_data_pages (req , rmw ? 1 : 0 , pages , write_len ,
1880+ offset_in_page (write_pos ), false,
1881+ true);
1882+ req -> r_inode = inode ;
16551883 req -> r_mtime = mtime ;
1656- ceph_osdc_start_request (& fsc -> client -> osdc , req );
1657- ret = ceph_osdc_wait_request (& fsc -> client -> osdc , req );
1884+
1885+ /* Set up the assertion */
1886+ if (rmw ) {
1887+ /*
1888+ * Set up the assertion. If we don't have a version
1889+ * number, then the object doesn't exist yet. Use an
1890+ * exclusive create instead of a version assertion in
1891+ * that case.
1892+ */
1893+ if (assert_ver ) {
1894+ osd_req_op_init (req , 0 , CEPH_OSD_OP_ASSERT_VER , 0 );
1895+ req -> r_ops [0 ].assert_ver .ver = assert_ver ;
1896+ } else {
1897+ osd_req_op_init (req , 0 , CEPH_OSD_OP_CREATE ,
1898+ CEPH_OSD_OP_FLAG_EXCL );
1899+ }
1900+ }
1901+
1902+ ceph_osdc_start_request (osdc , req );
1903+ ret = ceph_osdc_wait_request (osdc , req );
16581904
16591905 ceph_update_write_metrics (& fsc -> mdsc -> metric , req -> r_start_latency ,
16601906 req -> r_end_latency , len , ret );
1661- out :
16621907 ceph_osdc_put_request (req );
16631908 if (ret != 0 ) {
1909+ dout ("sync_write osd write returned %d\n" , ret );
1910+ /* Version changed! Must re-do the rmw cycle */
1911+ if ((assert_ver && (ret == - ERANGE || ret == - EOVERFLOW )) ||
1912+ (!assert_ver && ret == - EEXIST )) {
1913+ /* We should only ever see this on a rmw */
1914+ WARN_ON_ONCE (!rmw );
1915+
1916+ /* The version should never go backward */
1917+ WARN_ON_ONCE (ret == - EOVERFLOW );
1918+
1919+ * from = saved_iter ;
1920+
1921+ /* FIXME: limit number of times we loop? */
1922+ continue ;
1923+ }
16641924 ceph_set_error_write (ci );
16651925 break ;
16661926 }
16671927
16681928 ceph_clear_error_write (ci );
16691929 pos += len ;
16701930 written += len ;
1931+ dout ("sync_write written %d\n" , written );
16711932 if (pos > i_size_read (inode )) {
16721933 check_caps = ceph_inode_set_size (inode , pos );
16731934 if (check_caps )
@@ -1681,6 +1942,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
16811942 ret = written ;
16821943 iocb -> ki_pos = pos ;
16831944 }
1945+ dout ("sync_write returning %d\n" , ret );
16841946 return ret ;
16851947}
16861948
0 commit comments