Gestionează resursele de calcul și date Qiskit Serverless
Versiuni de pachete
Codul de pe această pagină a fost dezvoltat folosind cerințele de mai jos. Recomandăm să folosești aceste versiuni sau unele mai noi.
qiskit[all]~=2.0.0
qiskit-ibm-runtime~=0.37.0
qiskit-serverless~=0.22.0
Cu Qiskit Serverless, poți gestiona calculul și datele din pattern-ul tău Qiskit, inclusiv CPU-uri, QPU-uri și alți acceleratori de calcul.
Setează statusuri detaliate
Sarcinile de lucru Serverless au mai multe etape într-un flux de lucru. În mod implicit, următoarele statusuri sunt vizibile cu job.status():
QUEUED: sarcina de lucru este în coadă pentru resurse clasiceINITIALIZING: sarcina de lucru este configuratăRUNNING: sarcina de lucru rulează în prezent pe resurse clasiceDONE: sarcina de lucru s-a finalizat cu succes
Poți seta, de asemenea, statusuri personalizate care descriu mai detaliat etapa specifică din fluxul de lucru, după cum urmează.
# Added by doQumentation — required packages for this notebook
!pip install -q qiskit qiskit-ibm-runtime qiskit-serverless
# This cell is hidden from users, it just creates a new folder
from pathlib import Path
Path("./source_files").mkdir(exist_ok=True)
%%writefile ./source_files/status_example.py
from qiskit_serverless import update_status, Job
## If your function has a mapping stage, particularly application functions, you can set the status to "RUNNING: MAPPING" as follows:
update_status(Job.MAPPING)
## While handling transpilation, error suppression, and so forth, you can set the status to "RUNNING: OPTIMIZING_FOR_HARDWARE":
update_status(Job.OPTIMIZING_HARDWARE)
## After you submit jobs to Qiskit Runtime, the underlying quantum job will be queued. You can set status to "RUNNING: WAITING_FOR_QPU":
update_status(Job.WAITING_QPU)
## When the Qiskit Runtime job starts running on the QPU, set the following status "RUNNING: EXECUTING_QPU":
update_status(Job.EXECUTING_QPU)
## Once QPU is completed and post-processing has begun, set the status "RUNNING: POST_PROCESSING":
update_status(Job.POST_PROCESSING)
Writing ./source_files/status_example.py
După finalizarea cu succes a acestei sarcini de lucru (cu save_result()), acest status va fi actualizat automat la DONE.
Fluxuri de lucru paralele
Pentru sarcinile clasice care pot fi paralelizate, folosește decoratorul @distribute_task pentru a defini cerințele de calcul necesare pentru a efectua o sarcină. Începe prin a reapela exemplul transpile_remote.py din subiectul Scrie-ți primul program Qiskit Serverless cu următorul cod.
Următorul cod necesită să fi salvat deja credențialele.
%%writefile ./source_files/transpile_remote.py
from qiskit.transpiler import generate_preset_pass_manager
from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit_serverless import distribute_task
service = QiskitRuntimeService()
@distribute_task(target={"cpu": 1})
def transpile_remote(circuit, optimization_level, backend):
"""Transpiles an abstract circuit (or list of circuits) into an ISA circuit for a given backend."""
pass_manager = generate_preset_pass_manager(
optimization_level=optimization_level,
backend=service.backend(backend)
)
isa_circuit = pass_manager.run(circuit)
return isa_circuit
Writing ./source_files/transpile_remote.py
În acest exemplu, ai decorat funcția transpile_remote() cu @distribute_task(target={"cpu": 1}). La rulare, aceasta creează o sarcină worker paralelă asincronă cu un singur nucleu de CPU și returnează o referință pentru urmărirea worker-ului. Pentru a obține rezultatul, pasează referința funcției get(). Putem folosi aceasta pentru a rula mai multe sarcini paralele:
%%writefile --append ./source_files/transpile_remote.py
from time import time
from qiskit_serverless import get, get_arguments, save_result, update_status, Job
# Get arguments
arguments = get_arguments()
circuit = arguments.get("circuit")
optimization_level = arguments.get("optimization_level")
backend = arguments.get("backend")
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Start distributed transpilation
update_status(Job.OPTIMIZING_HARDWARE)
start_time = time()
transpile_worker_references = [
transpile_remote(circuit, optimization_level, backend)
for circuit in arguments.get("circuit_list")
]
transpiled_circuits = get(transpile_worker_references)
end_time = time()
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Save result, with metadata
result = {
"circuits": transpiled_circuits,
"metadata": {
"resource_usage": {
"RUNNING: OPTIMIZING_FOR_HARDWARE": {
"CPU_TIME": end_time - start_time,
"QPU_TIME": 0,
},
}
},
}
save_result(result)
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It uploads the serverless program and checks it runs.
def test_serverless_job(title, entrypoint):
# Import in function to stop them interfering with user-facing code
from qiskit.circuit.random import random_circuit
from qiskit_serverless import IBMServerlessClient, QiskitFunction
import time
import uuid
title += "_" + uuid.uuid4().hex[:8]
serverless = IBMServerlessClient()
transpile_remote_demo = QiskitFunction(
title=title,
entrypoint=entrypoint,
working_dir="./source_files/",
)
serverless.upload(transpile_remote_demo)
job = serverless.get(title).run(
circuit=random_circuit(3, 3),
circuit_list=[random_circuit(3, 3) for _ in range(3)],
backend="ibm_torino",
optimization_level=1,
)
for retry in range(25):
time.sleep(5)
status = job.status()
if status == "DONE":
print("Job completed successfully")
return
if status not in [
"QUEUED",
"INITIALIZING",
"RUNNING",
"RUNNING: OPTIMIZING_FOR_HARDWARE",
"DONE",
]:
raise Exception(
f"Unexpected job status '{status}'.\nHere's the logs:\n"
+ job.logs()
)
print(f"Waiting for job (status '{status}')")
raise Exception("Job did not complete in time")
test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully
Explorează diferite configurații de sarcini
Poți aloca flexibil CPU, GPU și memorie pentru sarcinile tale prin @distribute_task(). Pentru Qiskit Serverless pe IBM Quantum® Platform, fiecare program este echipat cu 16 nuclee CPU și 32 GB RAM, care pot fi alocate dinamic după necesități.
Nucleele CPU pot fi alocate ca nuclee CPU întregi sau chiar alocări fracționale, după cum este ilustrat mai jos.
Memoria este alocată în număr de octeți. Reține că sunt 1024 de octeți într-un kilobyte, 1024 de kilobytes într-un megabyte și 1024 de megabytes într-un gigabyte. Pentru a aloca 2 GB de memorie pentru worker-ul tău, trebuie să aloci "mem": 2 * 1024 * 1024 * 1024.
%%writefile --append ./source_files/transpile_remote.py
@distribute_task(target={
"cpu": 16,
"mem": 2 * 1024 * 1024 * 1024
})
def transpile_remote(circuit, optimization_level, backend):
return None
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It checks the distributed program works.
test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully
Gestionează datele din programul tău
Qiskit Serverless îți permite să gestionezi fișiere în directorul /data în toate programele tale. Aceasta include câteva limitări:
- Sunt suportate doar fișiere
tarșih5în prezent - Acesta este doar un spațiu de stocare
/dataplat și nu poate avea subdirectoare/data/folder/
Mai jos este arătat cum să încarci fișiere. Asigură-te că te-ai autentificat la Qiskit Serverless cu contul tău IBM Quantum (vezi Implementează pe IBM Quantum Platform pentru instrucțiuni).
import tarfile
from qiskit_serverless import IBMServerlessClient
# Create a tar
filename = "transpile_demo.tar"
file = tarfile.open(filename, "w")
file.add("./source_files/transpile_remote.py")
file.close()
# Get a reference to a QiskitFunction
serverless = IBMServerlessClient()
transpile_remote_demo = next(
program
for program in serverless.list()
if program.title == "transpile_remote_serverless"
)
# Upload the tar to Serverless data directory
serverless.file_upload(file=filename, function=transpile_remote_demo)
'{"message":"/usr/src/app/media/5e1f442128cdf60018496a04/transpile_demo.tar"}'
În continuare, poți lista toate fișierele din directorul tău data. Aceste date sunt accesibile tuturor programelor.
serverless.files(function=transpile_remote_demo)
['classifier_name.pkl.tar', 'output.json.tar', 'transpile_demo.tar']
Aceasta poate fi realizată dintr-un program folosind file_download() pentru a descărca fișierul în mediul programului și decomprima arhiva tar.
%%writefile ./source_files/extract_tarfile.py
import tarfile
from qiskit_serverless import IBMServerlessClient
serverless = IBMServerlessClient(token="<YOUR_API_KEY>") # Use the 44-character API_KEY you created and saved from the IBM Quantum Platform Home dashboard
files = serverless.files()
demo_file = files[0]
downloaded_tar = serverless.file_download(demo_file)
with tarfile.open(downloaded_tar, 'r') as tar:
tar.extractall()
La acest punct, programul tău poate interacționa cu fișierele, la fel ca într-un experiment local. file_upload(), file_download() și file_delete() pot fi apelate din experimentul tău local sau din programul tău încărcat, pentru o gestionare a datelor consecventă și flexibilă.
Pași următori
- Vezi un exemplu complet care portează cod existent la Qiskit Serverless.
- Citește un articol în care cercetătorii au folosit Qiskit Serverless și supercomputingul centrat pe cuantică pentru a explora chimia cuantică.