Initializing a new project
mkdir colink-py-example
cd colink-py-example
<aside> π‘ Note that prevalent Linux distributions have Python3 installed initially. If you donβt have Python3 installed, please install it before trying this example.
</aside>
Install the CoLink Python SDK library using the following script.
pip3 install colink
As the first step to introducing CoLink into your code, try running the following command for a sanity check; as long as there are no errors, congratulations! You are all set!
python3 -c "import colink"
<aside> π‘ Note that we are using a testing server with its host JWT. For more convenient experiment, please consider setting up your own CoLink server. You can refer to CoLink Server Setup for detailed instructions.
</aside>
from colink.sdk_a import (
CoLink,
generate_user,
get_time_stamp,
prepare_import_user_signature,
)
if __name__ == "__main__":
# Step 1. Generate a key pair for a new user.
pk, sk = generate_user()
print(f"Key pair: {pk.serialize()}:{sk.serialize()}")
# Step 2. Connect to a running colink server
# and get its public key for later usage.
# Plz refer to the documentation for how to set up a colink server.
# Here we use one of our testing server.
addr = "<https://test.colink-server.colearn.cloud>"
jwt = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJwcml2aWxlZ2UiOiJob3N0IiwidXNlcl9pZCI6IjAzNzY4N2NkYzhiMGE2ZmI3NzhjYzgwNjVmMzM5MTg3OGVjZmEwY2QxYzhlNzk2NGY1YzY0ODNlY2QzM2FjZWE3MCIsImV4cCI6MTY5NjM4MjkyNH0.vi2e7-Dn449oZq8Qpq_ihPDeohXzdLN1pAMxmx5jSkM"
cl = CoLink(addr, jwt)
_, core_pub_key = cl.request_core_info()
print(f"Core pub key: {core_pub_key}")
# Step 3. To show that you agree the server to act on behalf of you
# and also prove that you are who you claim to be,
# you need to sign a "consent" message with your secret key.
# A default expiration timestamp at 31 days later
expiration_timestamp = get_time_stamp() + 86400 * 31
signature_timestamp, sig = prepare_import_user_signature(
pk, sk, core_pub_key, expiration_timestamp
)
print(f"Sig: {sig}")
# Step 4. Import the user to the colink server.
# Note that you will need a `cl` session with a host JWT for this operation.
# You will get a new user JWT for future authentication.
result_jwt = cl.import_user(pk, signature_timestamp, expiration_timestamp, sig)
print(f"Result JWT: {result_jwt}")
<aside> π‘ Note that the user JWT in this step is generated from the previous steps.
</aside>
from colink.sdk_a import CoLink, str_to_byte
if __name__ == "__main__":
# Use the user JWT to connect to a running colink server.
addr = "<https://test.colink-server.colearn.cloud>"
jwt = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJwcml2aWxlZ2UiOiJ1c2VyIiwidXNlcl9pZCI6IjAyMThiYmNhNGVjYmQ3NzRhNDllODcxMTdiNTBmMDQ3MzliODVjN2UzNTI1YjJmM2ZhYWE1ODgxN2JjZjkxMzg4YyIsImV4cCI6MTY2OTEzODUxMX0.r7HZim5zY_jBwhcQg4dNDE_g-fRWbNYyf2NkO120HEQ"
cl = CoLink(addr, jwt)
# Then basically it is all set: now you can interact with the private storage.
# Create
key_name = "aa"
key_path = cl.create_entry(key_name, str_to_byte("v"))
print(f"Create -> {key_path}")
# Read (using key name)
print(f"Read (with key name) -> {cl.read_entry(key_name)}")
# Read (using key path)
print(f"Read (with key path) -> {cl.read_entry(key_path)}")
# Update
new_key_path = cl.update_entry(key_name, str_to_byte("vv"))
print(f"Update -> {new_key_path}")
print(f"Read again (with key name) -> {cl.read_entry(key_name)}")
print(f"Read old keypath -> {cl.read_entry(key_path)}")
# Delete
print(f"Delete -> {cl.delete_entry(key_name)}")
# notice: no "?"
print(f"Read again (with key name) - return the msg -> {cl.read_entry(key_name)}")
print(f"Read old keypath -> {cl.read_entry(key_path)}")
# List(get all entries with some prefix)
user_id = cl.get_user_id()
cl.update_entry("b", str_to_byte("example_value"))
cl.update_entry("b:c", str_to_byte("example_value"))
cl.update_entry("b:d", str_to_byte("example_value"))
print(f"List '<user_id>::*' -> {cl.read_keys(f'{user_id}:', False)}")
print(
f"List '<user_id>::*' (include history) -> {cl.read_keys(f'{user_id}:', True)}"
)
print(f"List '<user_id>::b:*' -> {cl.read_keys(f'{user_id}::b', False)}")
print(
f"List internal '<user_id>::_internal' -> {cl.read_keys(f'{user_id}::_internal', False)}"
)
# More advanced usage:
# Subscribe: you can also "subscribe" to the changes of a key
queue_name = cl.subscribe(key_name, None)
print(f"Subscription queue name -> {queue_name}")
# You can use:
# sub = cl.new_subscriber(queue_name)
# To get a new subscriber and then use
# data = sub.get_next()
# To get the next change.
# After you are done, you can use:
cl.unsubscribe(queue_name)
# To unsubscribe and offload from the colink server.
# Read or wait
# One convenient extension to subscription is the ability to read a key
# before it is even created.
# If the programmer is unsure whether the key would be ready during the execution
# and want the program to wait until the key is ready and get the value,
# they can use "read_or_wait" function.
# For how this is related to subscription, see the implementation of "read_or_wait".
cl.create_entry(key_name, str_to_byte("vvv"))
print(f"Read or wait -> {cl.read_or_wait(key_name)}")
<aside> π‘ Note that the user JWT in this step is generated from the previous steps.
</aside>
import time
import colink as CL
from colink.sdk_a import CoLink
if __name__ == "__main__":
# Use the user JWT to connect to a running colink server.
addr = "<https://test.colink-server.colearn.cloud>"
jwt = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJwcml2aWxlZ2UiOiJ1c2VyIiwidXNlcl9pZCI6IjAyZDFlNTZkMjk0YWRiMjkwYmY1NjFhNzNlZWY1OGQyYjFlMTAzNzFkNTA5ZTU1NmRiYjM3ODA0OGJlNWU4MThhNCIsImV4cCI6MTY2OTEzNjE1NH0.jRDEU_j-P4O04c2G8cgk5ffxhzjQst3QEpkaxVJQjcE"
cl = CoLink(addr, jwt)
# Then basically it is all set: now you can run protocols as tasks.
# In this demonstration, we will use one default protocol that is already started -
# the "policy manager" protocol - to demonstrate how to instantiate protocols as tasks.
# let's try to pause the policy manager for a few seconds
protocol_name = "policy_module.stop"
protocol_param = b"" # not public parameter is needed for this protocol
participants = [
CL.Participant(
user_id=cl.get_user_id(),
role="local",
)
] # in this example, the current user is required as the only participant
require_agreement = False # local protocols do not require agreements
task_id = cl.run_task(
protocol_name, protocol_param, participants, require_agreement
)
print(f"[Policy Module Stop] Task ID: {task_id}\\n")
# Note that the system is completely asynchronous,
# so getting the task id does not imply that the task is finished or even already running.
time.sleep(3)
task_id = cl.run_task(
"policy_module.start", protocol_param, participants, require_agreement
)
print(f"[Policy Module Start] Task ID: {task_id}\\n")
Think about the functionality of the protocol and write a TOML protocol specification file colink.toml
. Here we give an example from our greetings. While colink.toml
is not required when one decide to manually start the protocol operators, it is required for automatic deployment enabled by the CoLink protocol operator manager.
[package]
name = "greetings"
version = "0.1.1"
keywords = ["greetings"]
description = "greetings"
install_script = """
if { conda env list | grep 'colink-protocol-greetings'; } >/dev/null 2>&1; then
conda env remove -n colink-protocol-greetings
fi
conda create -n colink-protocol-greetings python=3.9 -y
conda activate colink-protocol-greetings
pip install colink
"""
entrypoint = """
conda activate colink-protocol-greetings
python colink-protocol-greetings.py
"""
[greetings]
name = "greetings"
description = "greetings"
Write the entry functions. In this example, in addition to showing the general skeleton of a protocol entry function, we also demonstrate how to use the set_variable
abstraction to conveniently communicate between different participants.
from typing import List
import colink as CL
from colink.sdk_a import CoLink, byte_to_str
from colink.sdk_p import ProtocolOperator
pop = ProtocolOperator(__name__)
@pop.handle("variable_transfer_example:initiator")
def run_initiator(cl: CoLink, param: bytes, participants: List[CL.Participant]):
print("initiator")
cl.set_variable("output", param, [participants[1]])
@pop.handle("variable_transfer_example:receiver")
def run_receiver(cl: CoLink, param: bytes, participants: List[CL.Participant]):
msg = cl.get_variable("output", participants[0])
print(f"{byte_to_str(msg)}")
cl.create_entry(f"tasks:{cl.get_task_id()}:output", msg)
if __name__ == "__main__":
pop.run()
Execute the protocol and attach it to a running CoLink server.
python3 protocol.py --addr <colink_server_address> --jwt <user_jwt>