import sys from flows_sdk.blocks import CodeBlock from flows_sdk.flows import Flow, Manifest, Parameter from flows_sdk.package_utils import export_flow from flows_sdk.utils import str_to_deterministic_uuid_4, workflow_input SECRETS_IN_CCB_FLOW_IDENTIFIER = 'SECRET_IN_CCB_FLOW' SECRET_IN_CCB_FLOW_UUID = str_to_deterministic_uuid_4(SECRETS_IN_CCB_FLOW_IDENTIFIER) def entry_point_flow() -> Flow: return secrets_in_ccb_flow() class FlowInputs: SECRET_FLOW_INPUT = 'secret_flow_input' def secrets_in_ccb_flow() -> Flow: def use_secret(secret: str) -> str: import requests # type: ignore[import-untyped] headers = {'Authorization': 'Token ' + secret} requests.post( 'http://localhost:8080/api/v5/flows/dd0ad61c-6e6b-4097-a4f5-db9511e002ab/deploy', headers=headers, ) return "don't return secrets as they will leak in plain text" use_secret_ccb = CodeBlock( reference_name='use_secret', code=use_secret, code_input={'secret': workflow_input(FlowInputs.SECRET_FLOW_INPUT)}, ) return Flow( depedencies={}, title='Flow that uses secrets in a CCB', description='Flow that uses secrets in a CCB', blocks=[use_secret_ccb], owner_email='flows.sdk@hyperscience.com', manifest=Manifest( identifier=SECRETS_IN_CCB_FLOW_IDENTIFIER, input=[ ( Parameter( name=FlowInputs.SECRET_FLOW_INPUT, title='Secret string', type='string', secret=True, ) ) ], ), uuid=SECRET_IN_CCB_FLOW_UUID, ) if __name__ == '__main__': export_filename = None if len(sys.argv) > 1: export_filename = sys.argv[1] export_flow(flow=entry_point_flow(), filename=export_filename)