bpo-26826: Expose copy_file_range in the os module (GH-7255) · python/cpython@aac4d03 (original) (raw)

`@@ -231,6 +231,89 @@ def test_symlink_keywords(self):

`

231

231

`except (NotImplementedError, OSError):

`

232

232

`pass # No OS support or unprivileged user

`

233

233

``

``

234

`+

@unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')

`

``

235

`+

def test_copy_file_range_invalid_values(self):

`

``

236

`+

with self.assertRaises(ValueError):

`

``

237

`+

os.copy_file_range(0, 1, -10)

`

``

238

+

``

239

`+

@unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')

`

``

240

`+

def test_copy_file_range(self):

`

``

241

`+

TESTFN2 = support.TESTFN + ".3"

`

``

242

`+

data = b'0123456789'

`

``

243

+

``

244

`+

create_file(support.TESTFN, data)

`

``

245

`+

self.addCleanup(support.unlink, support.TESTFN)

`

``

246

+

``

247

`+

in_file = open(support.TESTFN, 'rb')

`

``

248

`+

self.addCleanup(in_file.close)

`

``

249

`+

in_fd = in_file.fileno()

`

``

250

+

``

251

`+

out_file = open(TESTFN2, 'w+b')

`

``

252

`+

self.addCleanup(support.unlink, TESTFN2)

`

``

253

`+

self.addCleanup(out_file.close)

`

``

254

`+

out_fd = out_file.fileno()

`

``

255

+

``

256

`+

try:

`

``

257

`+

i = os.copy_file_range(in_fd, out_fd, 5)

`

``

258

`+

except OSError as e:

`

``

259

`+

Handle the case in which Python was compiled

`

``

260

`+

in a system with the syscall but without support

`

``

261

`+

in the kernel.

`

``

262

`+

if e.errno != errno.ENOSYS:

`

``

263

`+

raise

`

``

264

`+

self.skipTest(e)

`

``

265

`+

else:

`

``

266

`+

The number of copied bytes can be less than

`

``

267

`+

the number of bytes originally requested.

`

``

268

`+

self.assertIn(i, range(0, 6));

`

``

269

+

``

270

`+

with open(TESTFN2, 'rb') as in_file:

`

``

271

`+

self.assertEqual(in_file.read(), data[:i])

`

``

272

+

``

273

`+

@unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')

`

``

274

`+

def test_copy_file_range_offset(self):

`

``

275

`+

TESTFN4 = support.TESTFN + ".4"

`

``

276

`+

data = b'0123456789'

`

``

277

`+

bytes_to_copy = 6

`

``

278

`+

in_skip = 3

`

``

279

`+

out_seek = 5

`

``

280

+

``

281

`+

create_file(support.TESTFN, data)

`

``

282

`+

self.addCleanup(support.unlink, support.TESTFN)

`

``

283

+

``

284

`+

in_file = open(support.TESTFN, 'rb')

`

``

285

`+

self.addCleanup(in_file.close)

`

``

286

`+

in_fd = in_file.fileno()

`

``

287

+

``

288

`+

out_file = open(TESTFN4, 'w+b')

`

``

289

`+

self.addCleanup(support.unlink, TESTFN4)

`

``

290

`+

self.addCleanup(out_file.close)

`

``

291

`+

out_fd = out_file.fileno()

`

``

292

+

``

293

`+

try:

`

``

294

`+

i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,

`

``

295

`+

offset_src=in_skip,

`

``

296

`+

offset_dst=out_seek)

`

``

297

`+

except OSError as e:

`

``

298

`+

Handle the case in which Python was compiled

`

``

299

`+

in a system with the syscall but without support

`

``

300

`+

in the kernel.

`

``

301

`+

if e.errno != errno.ENOSYS:

`

``

302

`+

raise

`

``

303

`+

self.skipTest(e)

`

``

304

`+

else:

`

``

305

`+

The number of copied bytes can be less than

`

``

306

`+

the number of bytes originally requested.

`

``

307

`+

self.assertIn(i, range(0, bytes_to_copy+1));

`

``

308

+

``

309

`+

with open(TESTFN4, 'rb') as in_file:

`

``

310

`+

read = in_file.read()

`

``

311

`+

seeked bytes (5) are zero'ed

`

``

312

`+

self.assertEqual(read[:out_seek], b'\x00'*out_seek)

`

``

313

`+

012 are skipped (in_skip)

`

``

314

`+

345678 are copied in the file (in_skip + bytes_to_copy)

`

``

315

`+

self.assertEqual(read[out_seek:],

`

``

316

`+

data[in_skip:in_skip+i])

`

234

317

``

235

318

`# Test attributes on return values from os.stat family.

`

236

319

`class StatAttributeTests(unittest.TestCase):

`