Examples

This section contains examples of fully executable flows. We recommend using one of these examples as a template for building a flow to fit your exact business process.

Most examples marked contain a release and/or sample documents necessary to execute the flow in your instance of the Hyperscience Platform. You can still build your own release and test with additional documents.

V39.1

Name

Description

Release

Sample Docs

Download

IDP Starter V39.1

Basic document processing flow. Package includes release and sample documents for processing Form W-9s.

../_images/checkmark.png ../_images/checkmark.png

idp_flow_v39_1.zip

OnError flow

Example implementation of an OnError flow that is triggered once the original flow fails.

on_error_flow_v39_1.zip

IDP Starter V39.1

The basic document processing flow. From V38 on, the IDP Document processing is split into a wrapper flow (IDP) and a subflow (IDP Core). The package includes:
  • IDP Core Flow - a read-only subflow containing the steps to process documents into machine-readable output

  • Document Processing Flow - a wrapper around the IDP Core flow, used for managing input and output connector configuration and document processing workflow settings

  • IDP Submission Notify Flow - A flow that can be configured for some blocks of the IDP Core Flow, used to send notifications to external systems when a submission has been created or is waiting for supervision

  • Submission-aware On Error Flow - When the original flow fails for any reason, and all the configured auto-retries have been exhausted, the on-error flow will be triggered. The submission-aware on-error flow enriches the error message with submission data (compared with the plain OnError flow shown below). In case this OnError implementation is used on a non-IDP flow (i.e. a submissionless flow), submission data will be omitted

The package includes also release and sample documents for processing Form W-9s.

Download idp_flow_v39_1.zip

IDP V39.1 Sample
from uuid import UUID

from flows_sdk.error_handling import ErrorHandling, OnErrorFlow
from flows_sdk.flows import Flow
from flows_sdk.implementations.idp_v39_1 import idp_blocks, idp_values
from flows_sdk.implementations.idp_v39_1.idp_blocks import IDPCoreBlock
from flows_sdk.implementations.idp_v39_1.idp_values import (
    DEFAULT_IDP_CORE_IDENTIFIER,
    DEFAULT_IDP_SUBMISSION_NOTIFY_IDENTIFIER,
    DEFAULT_SUBMISSION_AWARE_ON_ERROR_FLOW_IDENTIFIER,
    IdpWorkflowConfig,
    IDPWrapperManifest,
    Settings,
    get_idp_wf_config,
)
from flows_sdk.package_utils import export_flow

IDP_UUID = '71d32ae8-65be-4518-932a-e865be95186e'
IDP_IDENTIFIER = 'DOCUMENT_PROCESSING_EXAMPLE'
IDP_TITLE = 'Document Processing - flows-sdk example'


def idp_workflow(idp_wf_config: IdpWorkflowConfig) -> Flow:
    # By default, all the notification flows are set to `None`.
    # I.e. the system will send no intermediate notifications for submission change events.
    # So let's specify the notification flows that we want to use explicitly here:
    idp_wf_config.notification_workflow_initialization = DEFAULT_IDP_SUBMISSION_NOTIFY_IDENTIFIER
    idp_wf_config.notification_workflow_classification = DEFAULT_IDP_SUBMISSION_NOTIFY_IDENTIFIER
    idp_wf_config.notification_workflow_identification = DEFAULT_IDP_SUBMISSION_NOTIFY_IDENTIFIER
    idp_wf_config.notification_workflow_transcription = DEFAULT_IDP_SUBMISSION_NOTIFY_IDENTIFIER
    idp_wf_config.notification_workflow_flexible_extraction = (
        DEFAULT_IDP_SUBMISSION_NOTIFY_IDENTIFIER
    )

    idp_flow = IDPCoreBlock(idp_wf_config, identifier=DEFAULT_IDP_CORE_IDENTIFIER)

    outputs = idp_blocks.IDPOutputsBlock(inputs={'submission': idp_flow.output('submission')})

    return Flow(
        uuid=UUID(IDP_UUID),
        owner_email='flows-sdk@hyperscience.com',
        title=IDP_TITLE,
        description='Use this flow to manage your input and output connector configuration and '
        'document processing workflow settings.',
        manifest=IDPWrapperManifest(IDP_IDENTIFIER),
        output={'submission': idp_flow.output('submission')},
        triggers=idp_values.IDPTriggers(),
        input={
            Settings.LayoutReleaseUuid: None,
        },
        blocks=[
            idp_flow,
            outputs,
        ],
        error_handling=ErrorHandling(
            on_error_flow=OnErrorFlow(identifier=DEFAULT_SUBMISSION_AWARE_ON_ERROR_FLOW_IDENTIFIER)
        ),
    )


def entry_point_workflow() -> Flow:
    idp_wf_config = get_idp_wf_config()
    return idp_workflow(idp_wf_config)


if __name__ == '__main__':
    export_flow(entry_point_workflow())

IDP Core V39.1 Sample
from uuid import UUID

from flows_sdk.flows import Flow
from flows_sdk.implementations.idp_v39_1 import idp_blocks
from flows_sdk.implementations.idp_v39_1.flows_version import FLOWS_VERSION, FLOWS_VERSION_DOTTED
from flows_sdk.implementations.idp_v39_1.idp_values import IDPCoreManifest
from flows_sdk.package_utils import export_flow
from flows_sdk.utils import str_to_deterministic_uuid_4

# If you are creating a custom version of this flow, choose a different UUID and identifier
# here and pass the correct identifier into IDPCoreBlock(identifier=...) within your flow.
IDP_CORE_IDENTIFIER = f'IDP_CORE_{FLOWS_VERSION}_EXAMPLE'
IDP_CORE_UUID = str(str_to_deterministic_uuid_4(IDP_CORE_IDENTIFIER))
IDP_CORE_TITLE = f'Document Processing Subflow {FLOWS_VERSION_DOTTED} - flows-sdk example'


def idp_workflow() -> Flow:
    bootstrap_submission = idp_blocks.SubmissionBootstrapBlock(
        reference_name='submission_bootstrap',
    )

    case_collation_task = idp_blocks.MachineCollationBlock(
        reference_name='machine_collation',
        submission=bootstrap_submission.output('submission'),
        cases=bootstrap_submission.output('api_params.cases'),
    )

    machine_classification = idp_blocks.MachineClassificationBlock(
        reference_name='machine_classification',
        submission=case_collation_task.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    manual_classification = idp_blocks.ManualClassificationBlock(
        reference_name='manual_classification',
        submission=machine_classification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    machine_identification = idp_blocks.MachineIdentificationBlock(
        reference_name='machine_identification',
        submission=manual_classification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    manual_identification = idp_blocks.ManualIdentificationBlock(
        reference_name='manual_identification',
        submission=machine_identification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    machine_transcription = idp_blocks.MachineTranscriptionBlock(
        reference_name='machine_transcription',
        submission=manual_identification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    manual_transcription = idp_blocks.ManualTranscriptionBlock(
        reference_name='manual_transcription',
        submission=machine_transcription.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    flexible_extraction = idp_blocks.FlexibleExtractionBlock(
        reference_name='flexible_extraction',
        submission=manual_transcription.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    reprocessing = idp_blocks.ReprocessingBlock(
        reference_name='reprocessing',
        submission=flexible_extraction.output('submission'),
    )

    submission_complete = idp_blocks.SubmissionCompleteBlock(
        reference_name='complete_submission',
        submission=reprocessing.output('submission'),
        is_reprocessing=reprocessing.output('is_reprocessing'),
    )

    return Flow(
        uuid=UUID(IDP_CORE_UUID),
        owner_email='flows-sdk@hyperscience.com',
        title=IDP_CORE_TITLE,
        description='A read-only subflow containing steps to process documents into '
        'machine-readable output.',
        manifest=IDPCoreManifest(
            flow_identifier=IDP_CORE_IDENTIFIER, roles=['read_only', 'supporting', 'idp_core']
        ),
        output={'submission': submission_complete.output('submission')},
        blocks=[
            bootstrap_submission,
            case_collation_task,
            machine_classification,
            manual_classification,
            machine_identification,
            manual_identification,
            machine_transcription,
            manual_transcription,
            flexible_extraction,
            reprocessing,
            submission_complete,
        ],
    )


def entry_point_workflow() -> Flow:
    return idp_workflow()


if __name__ == '__main__':
    export_flow(entry_point_workflow())

IDP Submission Notify V39.1 Sample
import os
from uuid import UUID

from flows_sdk.blocks import Outputs
from flows_sdk.flows import Flow, Manifest, Parameter
from flows_sdk.implementations.idp_v39_1.flows_version import FLOWS_VERSION, FLOWS_VERSION_DOTTED
from flows_sdk.package_utils import export_flow
from flows_sdk.utils import icon_path_to_base64, str_to_deterministic_uuid_4, workflow_input

BASE_DIR = os.path.dirname(os.path.abspath(__file__))

IDP_SUBMISSION_NOTIFY_IDENTIFIER = f'IDP_SUBMISSION_NOTIFY_{FLOWS_VERSION}_EXAMPLE'
IDP_SUBMISSION_NOTIFY_UUID = str(str_to_deterministic_uuid_4(IDP_SUBMISSION_NOTIFY_IDENTIFIER))
IDP_SUBMISSION_NOTIFY_TITLE = (
    f'Submission State Notifications {FLOWS_VERSION_DOTTED} - flows-sdk example'
)
IDP_OUTPUT_ROLE = 'idp_output'


class SubmissionNotifyInputKeys:
    submission: str = 'submission'


class SubmissionNotifyManifest(Manifest):
    def __init__(self, flow_identifier: str):
        super().__init__(
            identifier=flow_identifier,
            roles=['notifications', 'supporting'],
            input=[
                Parameter(
                    name=SubmissionNotifyInputKeys.submission,
                    type='Submission',
                    title='Submission Object',
                    ui={
                        'hidden': True,
                    },
                )
            ],
            ui={
                'icon': icon_path_to_base64(os.path.join(BASE_DIR, 'submission_notify_icon.png')),
            },
        )


def idp_submission_notify_workflow() -> Flow:
    return Flow(
        title=IDP_SUBMISSION_NOTIFY_TITLE,
        owner_email='flows.sdk@hyperscience.com',
        description=(
            'Send notifications to external systems when a submission has been '
            'created or is waiting for supervision. \n'
            f'If the "Document Processing" flow is live, this flow '
            'must also be live, but it can be empty.'
        ),
        manifest=SubmissionNotifyManifest(IDP_SUBMISSION_NOTIFY_IDENTIFIER),
        blocks=[
            Outputs(
                reference_name='outputs',
                title='Outputs',
                description=(
                    'Send submission data to external systems when a submission '
                    'has been created or is waiting for supervision'
                ),
                role_filter=[IDP_OUTPUT_ROLE],
                input_template={
                    'submission': {'id': workflow_input('submission.id')},
                    'enabled': True,
                },
                blocks=[],
            )
        ],
        uuid=UUID(IDP_SUBMISSION_NOTIFY_UUID),
        input={},
        output={},
    )


def entry_point_workflow() -> Flow:
    return idp_submission_notify_workflow()


if __name__ == '__main__':
    export_flow(entry_point_workflow())

Submission-aware On Error Flow V39.1 Sample
from uuid import UUID

from flows_sdk.blocks import PythonBlock
from flows_sdk.flows import Flow, Manifest, Parameter
from flows_sdk.implementations.idp_v39_1.additional_blocks import (
    HyperscienceRestApiBlock,
    JSONOutputsBlock,
)
from flows_sdk.implementations.idp_v39_1.flows_version import FLOWS_VERSION, FLOWS_VERSION_DOTTED
from flows_sdk.package_utils import export_flow
from flows_sdk.types import HsTask
from flows_sdk.utils import str_to_deterministic_uuid_4, workflow_input

SUBMISSION_AWARE_ON_ERROR_FLOW_IDENTIFIER = f'SUBMISSION_AWARE_ON_ERROR_FLOW_{FLOWS_VERSION}'
SUBMISSION_AWARE_ON_ERROR_FLOW_UUID = str(
    str_to_deterministic_uuid_4(SUBMISSION_AWARE_ON_ERROR_FLOW_IDENTIFIER)
)
SUBMISSION_AWARE_ON_ERROR_FLOW_TITLE = (
    f'On-Error with included Submission data {FLOWS_VERSION_DOTTED}'
)
SUBMISSION_AWARE_ON_ERROR_DESCRIPTION = """Flow designed to be used as an "on-error" handler for
 submission-based flows.\n
With the on-error flow configured, when the original flow fails for any reason, and all
    the configured auto-retries have been exhausted, the on-error flow will be triggered.
\n
In this on-error flow implementation,
    we fetch the full metadata of the failed flow run, alongside with the full submission metadata
    (if the submission ID was found), and then send it to a user-configured output.
    The data format of the output conforms to
    the schema documented here: https://docs.hyperscience.com/#flows-runs and
    here: https://docs.hyperscience.com/#submissions, under the respective keys
    "flow_run" and "submission".
\n
If you need more complex logic in this flow
    please consult the documentation of Flows SDK (https://flows-sdk.hyperscience.com)
    and the available public APIs (https://docs.hyperscience.com)
"""


class OnErrorFlowInputKeys:
    failed_run_uuid: str = 'failed_run_uuid'


class OnErrorManifest(Manifest):
    def __init__(self, flow_identifier: str):
        super().__init__(
            identifier=flow_identifier,
            roles=['on_error', 'supporting'],
            input=[
                Parameter(
                    name=OnErrorFlowInputKeys.failed_run_uuid,
                    type='string',
                    title='Failed Flow Run UUID',
                    description='UUID of the failed flow run that triggered this on_error flow',
                    optional=False,
                    ui={'hidden': True},
                )
            ],
        )


def submission_aware_on_error_flow() -> Flow:
    flow_run_rest_api_block = HyperscienceRestApiBlock(
        reference_name='flow_run_rest_api_block',
        title='Get Flow Run',
        description='Get the full metadata of the failed flow run',
        method='GET',
        app_endpoint=f'/api/v5/flow_runs/{workflow_input(OnErrorFlowInputKeys.failed_run_uuid)}',
    )

    def _get_correlation_id(_hs_task: HsTask) -> dict:
        return {'correlation_id': _hs_task.correlation_id}

    get_correlation_id_block = PythonBlock(
        reference_name='get_correlation_id',
        title='Get Correlation ID',
        description='Get the correlation ID for the failed Submission / Flow Run',
        code=_get_correlation_id,
    )

    fetch_submissions_block = HyperscienceRestApiBlock(
        reference_name='fetch_submission',
        title='Fetch Submission',
        description='Fetch the submission data for the failed flow run',
        method='GET',
        app_endpoint=f'/api/v5/submissions'
        '?debug=true'
        f'&correlation_id={get_correlation_id_block.output("correlation_id")}',
    )

    def _prep_output_data(flow_run: dict, get_submissions_response: dict) -> dict:
        submissions_in_response = get_submissions_response.get('results', [])
        submission = None if not submissions_in_response else submissions_in_response[0]
        return {'flow_run': flow_run, 'submission': submission}

    prep_output_data_block = PythonBlock(
        title='Format Submission Response',
        description='Prepare the output data to be sent to the user-configured output'
        'under the keys "flow_run" and "submission"',
        code=_prep_output_data,
        code_input={
            'flow_run': flow_run_rest_api_block.output('result.data'),
            'get_submissions_response': fetch_submissions_block.output('result.data'),
        },
    )

    json_outputs_block = JSONOutputsBlock(inputs={'payload': prep_output_data_block.output()})

    return Flow(
        uuid=UUID(SUBMISSION_AWARE_ON_ERROR_FLOW_UUID),
        owner_email='flows-sdk@hyperscience.com',
        title=SUBMISSION_AWARE_ON_ERROR_FLOW_TITLE,
        description=SUBMISSION_AWARE_ON_ERROR_DESCRIPTION,
        manifest=OnErrorManifest(SUBMISSION_AWARE_ON_ERROR_FLOW_IDENTIFIER),
        output={},
        input={},
        blocks=[
            flow_run_rest_api_block,
            get_correlation_id_block,
            fetch_submissions_block,
            prep_output_data_block,
            json_outputs_block,
        ],
    )


def entry_point_workflow() -> Flow:
    return submission_aware_on_error_flow()


if __name__ == '__main__':
    export_flow(submission_aware_on_error_flow())


V39

Name

Description

Release

Sample Docs

Download

IDP Starter V39

Basic document processing flow. Package includes release and sample documents for processing Form W-9s.

../_images/checkmark.png ../_images/checkmark.png

idp_flow_v39.zip

OnError flow

Example implementation of an OnError flow that is triggered once the original flow fails.

on_error_flow_v39.zip

IDP Starter V39

The basic document processing flow. From V38 on, the IDP Document processing is split into a wrapper flow (IDP) and a subflow (IDP Core). The package includes:
  • IDP Core Flow - a read-only subflow containing the steps to process documents into machine-readable output

  • Document Processing Flow - a wrapper around the IDP Core flow, used for managing input and output connector configuration and document processing workflow settings

  • IDP Submission Notify Flow - A flow that can be configured for some blocks of the IDP Core Flow, used to send notifications to external systems when a submission has been created or is waiting for supervision

  • Submission-aware On Error Flow - When the original flow fails for any reason, and all the configured auto-retries have been exhausted, the on-error flow will be triggered. The submission-aware on-error flow enriches the error message with submission data (compared with the plain OnError flow shown below). In case this OnError implementation is used on a non-IDP flow (i.e. a submissionless flow), submission data will be omitted

The package includes also release and sample documents for processing Form W-9s.

Download idp_flow_v39.zip

IDP V39 Sample
from uuid import UUID

from flows_sdk.error_handling import ErrorHandling, OnErrorFlow
from flows_sdk.flows import Flow
from flows_sdk.implementations.idp_v39 import idp_blocks, idp_values
from flows_sdk.implementations.idp_v39.idp_blocks import IDPCoreBlock
from flows_sdk.implementations.idp_v39.idp_values import (
    DEFAULT_IDP_CORE_IDENTIFIER,
    DEFAULT_IDP_SUBMISSION_NOTIFY_IDENTIFIER,
    DEFAULT_SUBMISSION_AWARE_ON_ERROR_FLOW_IDENTIFIER,
    IdpWorkflowConfig,
    IDPWrapperManifest,
    Settings,
    get_idp_wf_config,
)
from flows_sdk.package_utils import export_flow

IDP_UUID = '96a6606e-6c27-4423-a660-6e6c278423fb'
IDP_IDENTIFIER = 'DOCUMENT_PROCESSING_EXAMPLE'
IDP_TITLE = 'Document Processing - flows-sdk example'


def idp_workflow(idp_wf_config: IdpWorkflowConfig) -> Flow:
    # By default, all the notification flows are set to `None`.
    # I.e. the system will send no intermediate notifications for submission change events.
    # So let's specify the notification flows that we want to use explicitly here:
    idp_wf_config.notification_workflow_initialization = DEFAULT_IDP_SUBMISSION_NOTIFY_IDENTIFIER
    idp_wf_config.notification_workflow_classification = DEFAULT_IDP_SUBMISSION_NOTIFY_IDENTIFIER
    idp_wf_config.notification_workflow_identification = DEFAULT_IDP_SUBMISSION_NOTIFY_IDENTIFIER
    idp_wf_config.notification_workflow_transcription = DEFAULT_IDP_SUBMISSION_NOTIFY_IDENTIFIER
    idp_wf_config.notification_workflow_flexible_extraction = (
        DEFAULT_IDP_SUBMISSION_NOTIFY_IDENTIFIER
    )

    idp_flow = IDPCoreBlock(idp_wf_config, identifier=DEFAULT_IDP_CORE_IDENTIFIER)

    outputs = idp_blocks.IDPOutputsBlock(inputs={'submission': idp_flow.output('submission')})

    return Flow(
        uuid=UUID(IDP_UUID),
        owner_email='flows-sdk@hyperscience.com',
        title=IDP_TITLE,
        description='Use this flow to manage your input and output connector configuration and '
        'document processing workflow settings.',
        manifest=IDPWrapperManifest(IDP_IDENTIFIER),
        output={'submission': idp_flow.output('submission')},
        triggers=idp_values.IDPTriggers(),
        input={
            Settings.LayoutReleaseUuid: None,
        },
        blocks=[
            idp_flow,
            outputs,
        ],
        error_handling=ErrorHandling(
            on_error_flow=OnErrorFlow(identifier=DEFAULT_SUBMISSION_AWARE_ON_ERROR_FLOW_IDENTIFIER)
        ),
    )


def entry_point_workflow() -> Flow:
    idp_wf_config = get_idp_wf_config()
    return idp_workflow(idp_wf_config)


if __name__ == '__main__':
    export_flow(entry_point_workflow())

IDP Core V39 Sample
from uuid import UUID

from flows_sdk.flows import Flow
from flows_sdk.implementations.idp_v39 import idp_blocks
from flows_sdk.implementations.idp_v39.idp_values import IDPCoreManifest
from flows_sdk.package_utils import export_flow

# If you are creating a custom version of this flow, choose a different UUID and identifier
# here and pass the correct identifier into IDPCoreBlock(identifier=...) within your flow.
IDP_CORE_UUID = '3c0e2602-c27d-45d5-8e26-02c27d05d58c'
IDP_CORE_IDENTIFIER = 'IDP_CORE_V39_EXAMPLE'
IDP_CORE_TITLE = 'Document Processing Subflow V39 - flows-sdk example'


def idp_workflow() -> Flow:
    bootstrap_submission = idp_blocks.SubmissionBootstrapBlock(
        reference_name='submission_bootstrap',
    )

    case_collation_task = idp_blocks.MachineCollationBlock(
        reference_name='machine_collation',
        submission=bootstrap_submission.output('submission'),
        cases=bootstrap_submission.output('api_params.cases'),
    )

    machine_classification = idp_blocks.MachineClassificationBlock(
        reference_name='machine_classification',
        submission=case_collation_task.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    manual_classification = idp_blocks.ManualClassificationBlock(
        reference_name='manual_classification',
        submission=machine_classification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    machine_identification = idp_blocks.MachineIdentificationBlock(
        reference_name='machine_identification',
        submission=manual_classification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    manual_identification = idp_blocks.ManualIdentificationBlock(
        reference_name='manual_identification',
        submission=machine_identification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    machine_transcription = idp_blocks.MachineTranscriptionBlock(
        reference_name='machine_transcription',
        submission=manual_identification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    manual_transcription = idp_blocks.ManualTranscriptionBlock(
        reference_name='manual_transcription',
        submission=machine_transcription.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    flexible_extraction = idp_blocks.FlexibleExtractionBlock(
        reference_name='flexible_extraction',
        submission=manual_transcription.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    reprocessing = idp_blocks.ReprocessingBlock(
        reference_name='reprocessing',
        submission=flexible_extraction.output('submission'),
    )

    submission_complete = idp_blocks.SubmissionCompleteBlock(
        reference_name='complete_submission',
        submission=reprocessing.output('submission'),
        is_reprocessing=reprocessing.output('is_reprocessing'),
    )

    return Flow(
        uuid=UUID(IDP_CORE_UUID),
        owner_email='flows-sdk@hyperscience.com',
        title=IDP_CORE_TITLE,
        description='A read-only subflow containing steps to process documents into '
        'machine-readable output.',
        manifest=IDPCoreManifest(
            flow_identifier=IDP_CORE_IDENTIFIER, roles=['read_only', 'supporting', 'idp_core']
        ),
        output={'submission': submission_complete.output('submission')},
        blocks=[
            bootstrap_submission,
            case_collation_task,
            machine_classification,
            manual_classification,
            machine_identification,
            manual_identification,
            machine_transcription,
            manual_transcription,
            flexible_extraction,
            reprocessing,
            submission_complete,
        ],
    )


def entry_point_workflow() -> Flow:
    return idp_workflow()


if __name__ == '__main__':
    export_flow(entry_point_workflow())

IDP Submission Notify V39 Sample
import os

from flows_sdk.blocks import Outputs
from flows_sdk.flows import Flow, Manifest
from flows_sdk.package_utils import export_flow
from flows_sdk.utils import icon_path_to_base64, workflow_input

BASE_DIR = os.path.dirname(os.path.abspath(__file__))

IDP_SUBMISSION_NOTIFY_IDENTIFIER = 'IDP_SUBMISSION_NOTIFY_V39_EXAMPLE'
IDP_SUBMISSION_NOTIFY_UUID = 'cf09ba7b-664d-48c5-89ba-7b664df8c518'
IDP_SUBMISSION_NOTIFY_TITLE = 'Submission State Notifications V39 - flows-sdk example'

IDP_OUTPUT_ROLE = 'idp_output'


def idp_submission_notify_workflow() -> Flow:
    return Flow(
        title=IDP_SUBMISSION_NOTIFY_TITLE,
        owner_email='flows.sdk@hyperscience.com',
        description=(
            'Send notifications to external systems when a submission has been '
            'created or is waiting for supervision. \n'
            f'If the "Document Processing" flow is live, this flow '
            'must also be live, but it can be empty.'
        ),
        is_user_facing=True,
        blocks=[
            Outputs(
                reference_name='outputs',
                title='Outputs',
                description=(
                    'Send submission data to external systems when a submission '
                    'has been created or is waiting for supervision'
                ),
                role_filter=[IDP_OUTPUT_ROLE],
                input_template={
                    'submission': {'id': workflow_input('submission.id')},
                    'enabled': True,
                },
                blocks=[],
            )
        ],
        uuid=IDP_SUBMISSION_NOTIFY_UUID,
        input={},
        output={},
        manifest=Manifest(
            identifier=IDP_SUBMISSION_NOTIFY_IDENTIFIER,
            input=[
                {
                    'name': 'submission',
                    'type': 'Submission',
                    'title': 'Submission Object',
                    'ui': {'hidden': True},
                }
            ],
            output=[],
            roles=['notifications', 'supporting'],
            ui={
                'hidden': True,
                'icon': icon_path_to_base64(os.path.join(BASE_DIR, 'notification_flow_icon.png')),
            },
        ),
    )


def entry_point_workflow() -> Flow:
    return idp_submission_notify_workflow()


if __name__ == '__main__':
    export_flow(entry_point_workflow())

Submission-aware On Error Flow V39 Sample
from uuid import UUID

from flows_sdk.blocks import PythonBlock
from flows_sdk.flows import Flow, Manifest, Parameter
from flows_sdk.implementations.idp_v39.additional_blocks import (
    HyperscienceRestApiBlock,
    JSONOutputsBlock,
)
from flows_sdk.package_utils import export_flow
from flows_sdk.types import HsTask
from flows_sdk.utils import workflow_input

SUBMISSION_AWARE_ON_ERROR_FLOW_UUID = '909d73bc-c220-439d-9d73-bcc220f39d3d'
SUBMISSION_AWARE_ON_ERROR_FLOW_IDENTIFIER = 'SUBMISSION_AWARE_ON_ERROR_FLOW_V39_EXAMPLE'
SUBMISSION_AWARE_ON_ERROR_FLOW_TITLE = (
    'On-Error with included Submission data V39 - flows-sdk example'
)
SUBMISSION_AWARE_ON_ERROR_DESCRIPTION = """Flow designed to be used as an "on-error" handler for
 submission-based flows.\n
With the on-error flow configured, when the original flow fails for any reason, and all
    the configured auto-retries have been exhausted, the on-error flow will be triggered.
\n
In this on-error flow implementation,
    we fetch the full metadata of the failed flow run, alongside with the full submission metadata
    (if the submission ID was found), and then send it to a user-configured output.
    The data format of the output conforms to
    the schema documented here: https://docs.hyperscience.com/#flows-runs and
    here: https://docs.hyperscience.com/#submissions, under the respective keys
    "flow_run" and "submission".
\n
If you need more complex logic in this flow
    please consult the documentation of Flows SDK (https://flows-sdk.hyperscience.com)
    and the available public APIs (https://docs.hyperscience.com)
"""


class OnErrorFlowInputKeys:
    failed_run_uuid: str = 'failed_run_uuid'


class OnErrorManifest(Manifest):
    def __init__(self, flow_identifier: str):
        super().__init__(
            identifier=flow_identifier,
            roles=['on_error', 'supporting'],
            input=[
                Parameter(
                    name=OnErrorFlowInputKeys.failed_run_uuid,
                    type='string',
                    title='Failed Flow Run UUID',
                    description='UUID of the failed flow run that triggered this on_error flow',
                    optional=False,
                    ui={'hidden': True},
                )
            ],
        )


def submission_aware_on_error_flow() -> Flow:
    flow_run_rest_api_block = HyperscienceRestApiBlock(
        reference_name='flow_run_rest_api_block',
        title='Get Flow Run',
        description='Get the full metadata of the failed flow run',
        method='GET',
        app_endpoint=f'/api/v5/flow_runs/{workflow_input(OnErrorFlowInputKeys.failed_run_uuid)}',
    )

    def _get_correlation_id(_hs_task: HsTask) -> dict:
        return {'correlation_id': _hs_task.correlation_id}

    get_correlation_id_block = PythonBlock(
        reference_name='get_correlation_id',
        title='Get Correlation ID',
        description='Get the correlation ID for the failed Submission / Flow Run',
        code=_get_correlation_id,
    )

    fetch_submissions_block = HyperscienceRestApiBlock(
        reference_name='fetch_submission',
        title='Fetch Submission',
        description='Fetch the submission data for the failed flow run',
        method='GET',
        app_endpoint=f'/api/v5/submissions'
        '?debug=true'
        f'&correlation_id={get_correlation_id_block.output("correlation_id")}',
    )

    def _prep_output_data(flow_run: dict, get_submissions_response: dict) -> dict:
        submissions_in_response = get_submissions_response.get('results', [])
        submission = None if not submissions_in_response else submissions_in_response[0]
        return {'flow_run': flow_run, 'submission': submission}

    prep_output_data_block = PythonBlock(
        title='Format Submission Response',
        description='Prepare the output data to be sent to the user-configured output'
        'under the keys "flow_run" and "submission"',
        code=_prep_output_data,
        code_input={
            'flow_run': flow_run_rest_api_block.output('result.data'),
            'get_submissions_response': fetch_submissions_block.output('result.data'),
        },
    )

    json_outputs_block = JSONOutputsBlock(inputs={'payload': prep_output_data_block.output()})

    return Flow(
        uuid=UUID(SUBMISSION_AWARE_ON_ERROR_FLOW_UUID),
        owner_email='flows-sdk@hyperscience.com',
        title=SUBMISSION_AWARE_ON_ERROR_FLOW_TITLE,
        description=SUBMISSION_AWARE_ON_ERROR_DESCRIPTION,
        manifest=OnErrorManifest(SUBMISSION_AWARE_ON_ERROR_FLOW_IDENTIFIER),
        output={},
        input={},
        blocks=[
            flow_run_rest_api_block,
            get_correlation_id_block,
            fetch_submissions_block,
            prep_output_data_block,
            json_outputs_block,
        ],
    )


def entry_point_workflow() -> Flow:
    return submission_aware_on_error_flow()


if __name__ == '__main__':
    export_flow(submission_aware_on_error_flow())


OnError flow

This example shows how to produce an error message when a flow fails. The package includes the source code for an OnError flow, which can be configured to the IDP flow.

With the on-error flow configured, when the original flow fails for any reason, and all the configured auto-retries have been exhausted, the on-error flow will be triggered.

on_error_flow_v39.zip


OnError flow
from uuid import UUID

from flows_sdk.flows import Flow, Manifest, Parameter
from flows_sdk.implementations.idp_v39.additional_blocks import (
    HyperscienceRestApiBlock,
    JSONOutputsBlock,
)
from flows_sdk.package_utils import export_flow
from flows_sdk.utils import workflow_input

ON_ERROR_FLOW_UUID = 'd1cce845-07ec-4bd4-9a9f-cb73b39b68e6'
ON_ERROR_FLOW_IDENTIFIER = 'ON_ERROR_V39'
ON_ERROR_TITLE = 'On Error V39'
ON_ERROR_DESCRIPTION = (
    'Flow designed to be used as an "on-error" handler for other flows.'
    '\n\n'
    'With the on-error flow configured, when the original flow fails for any reason, and all '
    'the configured auto-retries have been exhausted, the on-error flow will be triggered.'
    '\n\n'
    'The input it receives is the ID of the failed flow run, which can be used to query '
    'additional information about the failure.'
    '\n\n'
    'In this on-error flow implementation, '
    'we fetch the full metadata of the failed flow run and then send it '
    'to a user-configured output. '
    'The data format of the output conforms to '
    'the schema documented here: https://docs.hyperscience.com/#flows-runs.'
    '\n\n'
    'If you need more complex logic in this flow, '
    'please consult the documentation of Flows SDK (https://flows-sdk.hyperscience.com) '
    'and the available public APIs (https://docs.hyperscience.com)'
)


class OnErrorFlowInputKeys:
    failed_run_uuid: str = 'failed_run_uuid'


class OnErrorManifest(Manifest):
    def __init__(self, flow_identifier: str):
        super().__init__(
            identifier=flow_identifier,
            roles=['on_error', 'supporting'],
            input=[
                Parameter(
                    name=OnErrorFlowInputKeys.failed_run_uuid,
                    type='string',
                    title='Failed Flow Run UUID',
                    description='UUID of the failed flow run that triggered this on_error flow',
                    optional=False,
                    ui={'hidden': True},
                ),
            ],
        )


def on_error_flow() -> Flow:
    flow_run_rest_api_block = HyperscienceRestApiBlock(
        reference_name='flow_run_rest_api_block',
        method='GET',
        app_endpoint=f'/api/v5/flow_runs/{workflow_input(OnErrorFlowInputKeys.failed_run_uuid)}',
    )

    json_outputs_block = JSONOutputsBlock(
        inputs={'payload': flow_run_rest_api_block.output('result.data')}
    )

    return Flow(
        uuid=UUID(ON_ERROR_FLOW_UUID),
        owner_email='flows-sdk@hyperscience.com',
        title=ON_ERROR_TITLE,
        description=ON_ERROR_DESCRIPTION,
        manifest=OnErrorManifest(ON_ERROR_FLOW_IDENTIFIER),
        output={},
        input={},
        blocks=[
            flow_run_rest_api_block,
            json_outputs_block,
        ],
    )


def entry_point_workflow() -> Flow:
    return on_error_flow()


if __name__ == '__main__':
    export_flow(entry_point_workflow())


V38

Name

Description

Release

Sample Docs

Download

IDP Starter V38

Basic document processing flow. Package includes release and sample documents for processing Form W-9s.

../_images/checkmark.png ../_images/checkmark.png

idp_flow_v38.zip

Submission-aware OnError flow

Example implementation of an OnError flow that enriches the error message with submission information.

submission_aware_on_error.zip

IDP Starter V38

The basic document processing flow. From V38 on, the IDP Document processing is split into a wrapper flow (IDP) and a subflow (IDP Core). The package includes:
  • IDP Core Flow - a read-only subflow containing the steps to process documents into machine-readable output

  • Document Processing Flow - a wrapper around the IDP Core flow, used for managing input and output connector configuration and document processing workflow settings

  • IDP Submission Notify Flow - A flow that can be configured for some blocks of the IDP Core Flow, used to send notifications to external systems when a submission has been created or is waiting for supervision

  • On Error Flow - A flow that is used as an “on-error” handler for other flows. With the on-error flow configured, when the original flow fails for any reason, and all the configured auto-retries have been exhausted, the on-error flow will be triggered. It can be configured to the IDP flow.

The package includes also release and sample documents for processing Form W-9s.

Download idp_flow_v38.zip

IDP V38 Sample
from uuid import UUID

from flows_sdk.error_handling import ErrorHandling, OnErrorFlow
from flows_sdk.flows import Flow
from flows_sdk.implementations.idp_v38 import idp_blocks, idp_values
from flows_sdk.implementations.idp_v38.idp_blocks import IDPCoreBlock
from flows_sdk.implementations.idp_v38.idp_values import (
    DEFAULT_IDP_CORE_IDENTIFIER,
    DEFAULT_IDP_SUBMISSION_NOTIFY_IDENTIFIER,
    DEFAULT_SUBMISSION_AWARE_ON_ERROR_FLOW_IDENTIFIER,
    IdpWorkflowConfig,
    IDPWrapperManifest,
    Settings,
    get_idp_wf_config,
)
from flows_sdk.package_utils import export_flow

IDP_UUID = 'dab9f90b-e9ec-421f-a2cc-988ebdcf21f1'
IDP_IDENTIFIER = 'DOCUMENT_PROCESSING_EXAMPLE'
IDP_TITLE = 'Document Processing - flows-sdk example'


def idp_workflow(idp_wf_config: IdpWorkflowConfig) -> Flow:
    # By default, all the notification flows are set to `None`.
    # I.e. the system will send no intermediate notifications for submission change events.
    # So let's specify the notification flows that we want to use explicitly here:
    idp_wf_config.notification_workflow_initialization = DEFAULT_IDP_SUBMISSION_NOTIFY_IDENTIFIER
    idp_wf_config.notification_workflow_classification = DEFAULT_IDP_SUBMISSION_NOTIFY_IDENTIFIER
    idp_wf_config.notification_workflow_identification = DEFAULT_IDP_SUBMISSION_NOTIFY_IDENTIFIER
    idp_wf_config.notification_workflow_transcription = DEFAULT_IDP_SUBMISSION_NOTIFY_IDENTIFIER
    idp_wf_config.notification_workflow_flexible_extraction = (
        DEFAULT_IDP_SUBMISSION_NOTIFY_IDENTIFIER
    )

    idp_flow = IDPCoreBlock(idp_wf_config, identifier=DEFAULT_IDP_CORE_IDENTIFIER)

    outputs = idp_blocks.IDPOutputsBlock(inputs={'submission': idp_flow.output('submission')})

    return Flow(
        uuid=UUID(IDP_UUID),
        owner_email='flows-sdk@hyperscience.com',
        title=IDP_TITLE,
        description='Use this flow to manage your input and output connector configuration and '
        'document processing workflow settings.',
        manifest=IDPWrapperManifest(IDP_IDENTIFIER),
        output={'submission': idp_flow.output('submission')},
        triggers=idp_values.IDPTriggers(),
        input={
            Settings.LayoutReleaseUuid: None,
        },
        blocks=[
            idp_flow,
            outputs,
        ],
        error_handling=ErrorHandling(
            on_error_flow=OnErrorFlow(identifier=DEFAULT_SUBMISSION_AWARE_ON_ERROR_FLOW_IDENTIFIER)
        ),
    )


def entry_point_workflow() -> Flow:
    idp_wf_config = get_idp_wf_config()
    return idp_workflow(idp_wf_config)


if __name__ == '__main__':
    export_flow(entry_point_workflow())

IDP Core V38 Sample
from uuid import UUID

from flows_sdk.flows import Flow
from flows_sdk.implementations.idp_v38 import idp_blocks
from flows_sdk.implementations.idp_v38.idp_values import IDPCoreManifest
from flows_sdk.package_utils import export_flow

# If you are creating a custom version of this flow, choose a different UUID and identifier
# here and pass the correct identifier into IDPCoreBlock(identifier=...) within your flow.
IDP_CORE_UUID = 'dca93e70-c742-419b-8d70-2ac41ea027c2'
IDP_CORE_IDENTIFIER = 'IDP_CORE_V38_EXAMPLE'
IDP_CORE_TITLE = 'Document Processing Subflow V38 - flows-sdk example'


def idp_workflow() -> Flow:
    bootstrap_submission = idp_blocks.SubmissionBootstrapBlock(
        reference_name='submission_bootstrap',
    )

    case_collation_task = idp_blocks.MachineCollationBlock(
        reference_name='machine_collation',
        submission=bootstrap_submission.output('submission'),
        cases=bootstrap_submission.output('api_params.cases'),
    )

    machine_classification = idp_blocks.MachineClassificationBlock(
        reference_name='machine_classification',
        submission=case_collation_task.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    manual_classification = idp_blocks.ManualClassificationBlock(
        reference_name='manual_classification',
        submission=machine_classification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    machine_identification = idp_blocks.MachineIdentificationBlock(
        reference_name='machine_identification',
        submission=manual_classification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    manual_identification = idp_blocks.ManualIdentificationBlock(
        reference_name='manual_identification',
        submission=machine_identification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    machine_transcription = idp_blocks.MachineTranscriptionBlock(
        reference_name='machine_transcription',
        submission=manual_identification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    manual_transcription = idp_blocks.ManualTranscriptionBlock(
        reference_name='manual_transcription',
        submission=machine_transcription.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    flexible_extraction = idp_blocks.FlexibleExtractionBlock(
        reference_name='flexible_extraction',
        submission=manual_transcription.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    reprocessing = idp_blocks.ReprocessingBlock(
        reference_name='reprocessing',
        submission=flexible_extraction.output('submission'),
    )

    submission_complete = idp_blocks.SubmissionCompleteBlock(
        reference_name='complete_submission',
        submission=reprocessing.output('submission'),
        is_reprocessing=reprocessing.output('is_reprocessing'),
    )

    return Flow(
        uuid=UUID(IDP_CORE_UUID),
        owner_email='flows-sdk@hyperscience.com',
        title=IDP_CORE_TITLE,
        description='A read-only subflow containing steps to process documents into '
        'machine-readable output.',
        manifest=IDPCoreManifest(
            flow_identifier=IDP_CORE_IDENTIFIER, roles=['read_only', 'supporting', 'idp_core']
        ),
        output={'submission': submission_complete.output('submission')},
        blocks=[
            bootstrap_submission,
            case_collation_task,
            machine_classification,
            manual_classification,
            machine_identification,
            manual_identification,
            machine_transcription,
            manual_transcription,
            flexible_extraction,
            reprocessing,
            submission_complete,
        ],
    )


def entry_point_workflow() -> Flow:
    return idp_workflow()


if __name__ == '__main__':
    export_flow(entry_point_workflow())

IDP Submission Notify V38 Sample
import os

from flows_sdk.blocks import Outputs
from flows_sdk.flows import Flow, Manifest
from flows_sdk.package_utils import export_flow
from flows_sdk.utils import icon_path_to_base64, workflow_input

BASE_DIR = os.path.dirname(os.path.abspath(__file__))

IDP_SUBMISSION_NOTIFY_IDENTIFIER = 'IDP_SUBMISSION_NOTIFY_V38_EXAMPLE'
IDP_SUBMISSION_NOTIFY_UUID = 'cf22dd1e-d013-4d09-b598-40696fa9a322'
IDP_SUBMISSION_NOTIFY_TITLE = 'Submission State Notifications V38 - flows-sdk example'

IDP_OUTPUT_ROLE = 'idp_output'


def idp_submission_notify_workflow() -> Flow:
    return Flow(
        title='Submission State Notifications V38',
        owner_email='flows.sdk@hyperscience.com',
        description=(
            'Send notifications to external systems when a submission has been '
            'created or is waiting for supervision. \n'
            f'If the "Document Processing" flow is live, this flow '
            'must also be live, but it can be empty.'
        ),
        is_user_facing=True,
        blocks=[
            Outputs(
                reference_name='outputs',
                title='Outputs',
                description=(
                    'Send submission data to external systems when a submission '
                    'has been created or is waiting for supervision'
                ),
                role_filter=[IDP_OUTPUT_ROLE],
                input_template={
                    'submission': {'id': workflow_input('submission.id')},
                    'enabled': True,
                },
                blocks=[],
            )
        ],
        uuid=IDP_SUBMISSION_NOTIFY_UUID,
        input={},
        output={},
        manifest=Manifest(
            identifier=IDP_SUBMISSION_NOTIFY_IDENTIFIER,
            input=[
                {
                    'name': 'submission',
                    'type': 'Submission',
                    'title': 'Submission Object',
                    'ui': {'hidden': True},
                }
            ],
            output=[],
            roles=['notifications', 'supporting'],
            ui={
                'hidden': True,
                'icon': icon_path_to_base64(os.path.join(BASE_DIR, 'notification_flow_icon.png')),
            },
        ),
    )


def entry_point_workflow() -> Flow:
    return idp_submission_notify_workflow()


if __name__ == '__main__':
    export_flow(entry_point_workflow())

On Error Flow V38 Sample
from uuid import UUID

from flows_sdk.flows import Flow, Manifest, Parameter
from flows_sdk.implementations.idp_v38.additional_blocks import (
    HyperscienceRestApiBlock,
    JSONOutputsBlock,
)
from flows_sdk.package_utils import export_flow
from flows_sdk.utils import workflow_input

ON_ERROR_FLOW_UUID = '46ad83ad-5a9c-46c5-a322-e2aa8e40bdce'
ON_ERROR_FLOW_TASK_NAME = 'ON_ERROR_V38_EXAMPLE'
ON_ERROR_DESCRIPTION = """Flow designed to be used as an "on-error" handler for other flows.
\n\n
With the on-error flow configured, when the original flow fails for any reason, and all
    the configured auto-retries have been exhausted, the on-error flow will be triggered.
\n\n
The input it receives is the ID of the failed flow run, which can be used to query
    additional information about the failure.
\n\n
In this on-error flow implementation,
    we fetch the full metadata of the failed flow run and then send it
    to a user-configured output.
    The data format of the output conforms to
    the schema documented here: https://docs.hyperscience.com/#flows-runs.
\n\n
If you need more complex logic in this flow
    please consult the documentation of Flows SDK (https://flows-sdk.hyperscience.com)
    and the available public APIs (https://docs.hyperscience.com)
"""


class OnErrorFlowInputKeys:
    failed_run_uuid: str = 'failed_run_uuid'


class OnErrorManifest(Manifest):
    def __init__(self, flow_identifier: str):
        super().__init__(
            identifier=flow_identifier,
            roles=['on_error', 'supporting'],
            input=[
                Parameter(
                    name=OnErrorFlowInputKeys.failed_run_uuid,
                    type='string',
                    title='Failed Flow Run UUID',
                    description='UUID of the failed flow run that triggered this on_error flow',
                    optional=False,
                    ui={'hidden': True},
                ),
            ],
        )


def on_error_flow() -> Flow:

    flow_run_rest_api_block = HyperscienceRestApiBlock(
        reference_name='flow_run_rest_api_block',
        method='GET',
        app_endpoint=f'/api/v5/flow_runs/{workflow_input(OnErrorFlowInputKeys.failed_run_uuid)}',
    )

    json_outputs_block = JSONOutputsBlock(
        inputs={'payload': flow_run_rest_api_block.output('result.data')}
    )

    return Flow(
        uuid=UUID(ON_ERROR_FLOW_UUID),
        owner_email='flows.sdk@hyperscience.com',
        title='On-Error V38 - flows-sdk',
        description=ON_ERROR_DESCRIPTION,
        manifest=OnErrorManifest(ON_ERROR_FLOW_TASK_NAME),
        output={},
        input={},
        blocks=[
            flow_run_rest_api_block,
            json_outputs_block,
        ],
    )


def entry_point_workflow() -> Flow:
    return on_error_flow()


if __name__ == '__main__':
    export_flow(entry_point_workflow())


Submission-aware OnError flow

This example shows how to enrich an error message with submission information. The package includes the source code for a Submission-aware onError flow, which can be configured to the IDP flow.

With the on-error flow configured, when the original flow fails for any reason, and all the configured auto-retries have been exhausted, the on-error flow will be triggered. In case this OnError implementation is used on a non-IDP flow (i.e. a submissionless flow), submission data will be omitted.

submission_aware_on_error.zip


Submission-aware OnError flow
import os
from uuid import UUID

from flows_sdk.blocks import PythonBlock
from flows_sdk.flows import Flow, Manifest, Parameter
from flows_sdk.implementations.idp_v38.additional_blocks import (
    HyperscienceRestApiBlock,
    JSONOutputsBlock,
)
from flows_sdk.package_utils import export_flow
from flows_sdk.types import HsTask
from flows_sdk.utils import workflow_input

ON_ERROR_FLOW_UUID = '1e8605e7-db3f-4fae-ac25-029a7ffd4ba4'
ON_ERROR_FLOW_TASK_NAME = 'SUBMISSION_AWARE_ON_ERROR_FLOW'
ON_ERROR_DESCRIPTION = """
Flow designed to be used as an "on-error" handler for submission-based flows.
\n
With the on-error flow configured, when the original flow fails for any reason, and all
    the configured auto-retries have been exhausted, the on-error flow will be triggered.
\n
In this on-error flow implementation,
    we fetch the full metadata of the failed flow run, alongside with the full submission metadata
    (if the submission ID was found), and then send it to a user-configured output.
    The data format of the output conforms to
    the schema documented here: https://docs.hyperscience.com/#flows-runs and
    here: https://docs.hyperscience.com/#submissions, under the respective keys
    "flow_run" and "submission".
\n
If you need more complex logic in this flow
    please consult the documentation of Flows SDK (https://flows-sdk.hyperscience.com)
    and the available public APIs (https://docs.hyperscience.com)
""".lstrip()
CURRENT_DIRECTORY = os.path.dirname(os.path.realpath(__file__))


class OnErrorFlowInputKeys:
    failed_run_uuid: str = 'failed_run_uuid'


class OnErrorManifest(Manifest):
    def __init__(self, flow_identifier: str):
        super().__init__(
            identifier=flow_identifier,
            roles=['on_error', 'supporting'],
            input=[
                Parameter(
                    name=OnErrorFlowInputKeys.failed_run_uuid,
                    type='string',
                    title='Failed Flow Run UUID',
                    description='UUID of the failed flow run that triggered this on_error flow',
                    optional=False,
                    ui={'hidden': True},
                )
            ],
        )


def on_error_flow() -> Flow:
    flow_run_rest_api_block = HyperscienceRestApiBlock(
        reference_name='flow_run_rest_api_block',
        title='Get Flow Run',
        description='Get the full metadata of the failed flow run',
        method='GET',
        app_endpoint=f'/api/v5/flow_runs/{workflow_input(OnErrorFlowInputKeys.failed_run_uuid)}',
    )

    def _get_correlation_id(_hs_task: HsTask) -> dict:
        return {'correlation_id': _hs_task.correlation_id}

    get_correlation_id_block = PythonBlock(
        reference_name='get_correlation_id',
        title='Get Correlation ID',
        description='Get the correlation ID for the failed Submission / Flow Run',
        code=_get_correlation_id,
    )

    fetch_submissions_block = HyperscienceRestApiBlock(
        reference_name='fetch_submission',
        title='Fetch Submission',
        description='Fetch the submission data for the failed flow run',
        method='GET',
        app_endpoint=f'/api/v5/submissions'
        '?debug=true'
        f'&correlation_id={get_correlation_id_block.output("correlation_id")}',
    )

    def _prep_output_data(flow_run: dict, get_submissions_response: dict) -> dict:
        submissions_in_response = get_submissions_response.get('results', [])
        submission = None if not submissions_in_response else submissions_in_response[0]
        return {'flow_run': flow_run, 'submission': submission}

    prep_output_data_block = PythonBlock(
        title='Format Submission Response',
        description='Prepare the output data to be sent to the user-configured output'
        'under the keys "flow_run" and "submission"',
        code=_prep_output_data,
        code_input={
            'flow_run': flow_run_rest_api_block.output('result.data'),
            'get_submissions_response': fetch_submissions_block.output('result.data'),
        },
    )

    json_outputs_block = JSONOutputsBlock(inputs={'payload': prep_output_data_block.output()})

    return Flow(
        uuid=UUID(ON_ERROR_FLOW_UUID),
        owner_email='flows-sdk@hyperscience.com',
        title='On-Error with included Submission data V38 - flows-sdk',
        description=ON_ERROR_DESCRIPTION,
        manifest=OnErrorManifest(ON_ERROR_FLOW_TASK_NAME),
        output={},
        input={},
        blocks=[
            flow_run_rest_api_block,
            get_correlation_id_block,
            fetch_submissions_block,
            prep_output_data_block,
            json_outputs_block,
        ],
    )


if __name__ == '__main__':
    export_flow(on_error_flow())


V37

Name

Description

Release

Sample Docs

Download

IDP Starter V37

Basic document processing flow. Package includes release and sample documents for processing Form W-9s.

../_images/checkmark.png ../_images/checkmark.png

idp_flow_v37.zip

IDP Starter V37

This example is a snapshot of our V37 Intelligent Document Processing (IDP) flow. It uses both Machine and Manual blocks to:

  1. Classify and collate pages

  2. Identify and transcribe text

  3. Additional manual transcription for any fields marked for review

  4. Output results to a downstream system

Download idp_flow_v37.zip

IDP Starter V37 flow
from uuid import UUID

from flows_sdk.flows import Flow
from flows_sdk.implementations.idp_v37 import idp_blocks, idp_values
from flows_sdk.implementations.idp_v37.idp_values import (
    IdpWorkflowConfig,
    get_idp_wf_config,
    get_idp_wf_inputs,
)
from flows_sdk.package_utils import export_flow

IDP_UUID = 'd5ba954c-8fde-437b-92ff-c87408723f91'
IDP_TASK_NAME = 'IDP_V37_EXAMPLE'


def idp_workflow(idp_wf_config: IdpWorkflowConfig) -> Flow:

    bootstrap_submission = idp_blocks.SubmissionBootstrapBlock(
        reference_name='submission_bootstrap',
    )

    case_collation_task = idp_blocks.MachineCollationBlock(
        reference_name='machine_collation',
        submission=bootstrap_submission.output('submission'),
        cases=bootstrap_submission.output('api_params.cases'),
    )

    machine_classification = idp_blocks.MachineClassificationBlock(
        reference_name='machine_classification',
        submission=case_collation_task.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
        rotation_correction_enabled=idp_wf_config.rotation_correction_enabled,
        mobile_processing_enabled=False,
    )

    manual_classification = idp_blocks.ManualClassificationBlock(
        reference_name='manual_classification',
        submission=machine_classification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    machine_identification = idp_blocks.MachineIdentificationBlock(
        reference_name='machine_identification',
        submission=manual_classification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    manual_identification = idp_blocks.ManualIdentificationBlock(
        reference_name='manual_identification',
        submission=machine_identification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    machine_transcription = idp_blocks.MachineTranscriptionBlock(
        reference_name='machine_transcription',
        submission=manual_identification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    manual_transcription = idp_blocks.ManualTranscriptionBlock(
        reference_name='manual_transcription',
        submission=machine_transcription.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
        table_output_manual_review=(
            idp_wf_config.manual_transcription_config.table_output_manual_review
        ),
        supervision_transcription_masking=(
            idp_wf_config.manual_transcription_config.supervision_transcription_masking
        ),
        task_restrictions=idp_wf_config.manual_transcription_config.task_restrictions,
    )

    flexible_extraction = idp_blocks.FlexibleExtractionBlock(
        reference_name='flexible_extraction',
        submission=manual_transcription.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
        task_restrictions=idp_wf_config.flexible_extraction_config.task_restrictions,
        supervision_transcription_masking=(
            idp_wf_config.flexible_extraction_config.supervision_transcription_masking
        ),
    )

    submission_complete = idp_blocks.SubmissionCompleteBlock(
        reference_name='complete_submission', submission=flexible_extraction.output('submission')
    )

    outputs = idp_blocks.IDPOutputsBlock(
        inputs={'submission': bootstrap_submission.output('submission')}
    )

    inputs = get_idp_wf_inputs(idp_wf_config)

    return Flow(
        uuid=UUID(IDP_UUID),
        owner_email='flows.sdk@hyperscience.com',
        title='Document Processing V37 - flows-sdk example',
        description='Document Processing V37 - flows-sdk example',
        manifest=idp_values.IDPManifest(flow_identifier=IDP_TASK_NAME),
        output={'submission': submission_complete.output()},
        triggers=idp_values.IDPTriggers(),
        input=inputs,
        blocks=[
            bootstrap_submission,
            case_collation_task,
            machine_classification,
            manual_classification,
            machine_identification,
            manual_identification,
            machine_transcription,
            manual_transcription,
            flexible_extraction,
            submission_complete,
            outputs,
        ],
    )


def entry_point_workflow() -> Flow:
    idp_wf_config = get_idp_wf_config()
    return idp_workflow(idp_wf_config)


if __name__ == '__main__':
    export_flow(flow=entry_point_workflow())


V36

Name

Description

Release

Sample Docs

Download

IDP Starter V36

Basic document processing flow. Package includes release and sample documents for processing Form W-9s.

../_images/checkmark.png ../_images/checkmark.png

idp_flow_v36.zip

IDP Starter V36

This example is a snapshot of our V36 Intelligent Document Processing (IDP) flow. It uses both Machine and Manual blocks to:

  1. Classify and collate pages

  2. Identify and transcribe text

  3. Additional manual transcription for any fields marked for review

  4. Output results to a downstream system

Download idp_flow_v36.zip

IDP Starter V36 flow
from uuid import UUID

from flows_sdk.flows import Flow
from flows_sdk.implementations.idp_v36 import idp_blocks, idp_values
from flows_sdk.implementations.idp_v36.idp_values import (
    IdpWorkflowConfig,
    get_idp_wf_config,
    get_idp_wf_inputs,
)
from flows_sdk.package_utils import export_flow

IDP_UUID = '2667344c-ffef-4021-86c1-108fd54b6654'
IDP_TASK_NAME = 'IDP_V36_EXAMPLE'


def idp_workflow(idp_wf_config: IdpWorkflowConfig) -> Flow:

    bootstrap_submission = idp_blocks.SubmissionBootstrapBlock(
        reference_name='submission_bootstrap',
    )

    case_collation_task = idp_blocks.MachineCollationBlock(
        reference_name='machine_collation',
        submission=bootstrap_submission.output('submission'),
        cases=bootstrap_submission.output('api_params.cases'),
    )

    machine_classification = idp_blocks.MachineClassificationBlock(
        reference_name='machine_classification',
        submission=case_collation_task.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
        rotation_correction_enabled=idp_wf_config.rotation_correction_enabled,
        mobile_processing_enabled=False,
    )

    manual_classification = idp_blocks.ManualClassificationBlock(
        reference_name='manual_classification',
        submission=machine_classification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    machine_identification = idp_blocks.MachineIdentificationBlock(
        reference_name='machine_identification',
        submission=manual_classification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    manual_identification = idp_blocks.ManualIdentificationBlock(
        reference_name='manual_identification',
        submission=machine_identification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    machine_transcription = idp_blocks.MachineTranscriptionBlock(
        reference_name='machine_transcription',
        submission=manual_identification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    manual_transcription = idp_blocks.ManualTranscriptionBlock(
        reference_name='manual_transcription',
        submission=machine_transcription.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
        table_output_manual_review=(
            idp_wf_config.manual_transcription_config.table_output_manual_review
        ),
        supervision_transcription_masking=(
            idp_wf_config.manual_transcription_config.supervision_transcription_masking
        ),
        task_restrictions=idp_wf_config.manual_transcription_config.task_restrictions,
    )

    flexible_extraction = idp_blocks.FlexibleExtractionBlock(
        reference_name='flexible_extraction',
        submission=manual_transcription.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
        task_restrictions=idp_wf_config.flexible_extraction_config.task_restrictions,
        supervision_transcription_masking=(
            idp_wf_config.flexible_extraction_config.supervision_transcription_masking
        ),
    )

    submission_complete = idp_blocks.SubmissionCompleteBlock(
        reference_name='complete_submission', submission=flexible_extraction.output('submission')
    )

    outputs = idp_blocks.IDPOutputsBlock(
        inputs={'submission': bootstrap_submission.output('submission')}
    )

    inputs = get_idp_wf_inputs(idp_wf_config)

    return Flow(
        uuid=UUID(IDP_UUID),
        owner_email='flows.sdk@hyperscience.com',
        title='Document Processing V36 - flows-sdk example',
        description='Document Processing V36 - flows-sdk example',
        manifest=idp_values.IDPManifest(flow_identifier=IDP_TASK_NAME),
        output={'submission': submission_complete.output()},
        triggers=idp_values.IDPTriggers(),
        input=inputs,
        blocks=[
            bootstrap_submission,
            case_collation_task,
            machine_classification,
            manual_classification,
            machine_identification,
            manual_identification,
            machine_transcription,
            manual_transcription,
            flexible_extraction,
            submission_complete,
            outputs,
        ],
    )


def entry_point_workflow() -> Flow:
    idp_wf_config = get_idp_wf_config()
    return idp_workflow(idp_wf_config)


if __name__ == '__main__':
    export_flow(flow=entry_point_workflow())


V35

Name

Description

Release

Sample Docs

Download

IDP Starter V35

Basic document processing flow. Package includes release and sample documents for processing Form W-9s.

../_images/checkmark.png ../_images/checkmark.png

idp_flow_v35.zip

Full Page Transcription V35

Transcribes text from documents.

idp_fpt_flow_v35.zip

IDP Starter V35

This example is a snapshot of our V35 Intelligent Document Processing (IDP) flow. It uses both Machine and Manual blocks to:

  1. Classify and collate pages

  2. Identify and transcribe text

  3. Additional manual transcription for any fields marked for review

  4. Output results to a downstream system

Download idp_flow_v35.zip

IDP Starter V35 flow
from uuid import UUID

from flows_sdk.flows import Flow
from flows_sdk.implementations.idp_v35 import idp_blocks, idp_values
from flows_sdk.implementations.idp_v35.idp_values import (
    IdpWorkflowConfig,
    get_idp_wf_config,
    get_idp_wf_inputs,
)
from flows_sdk.package_utils import export_flow

IDP_UUID = 'c25cc470-e2c4-4696-aeaa-26ba6cac255a'
IDP_TASK_NAME = 'IDP_V35_FLOW_EXAMPLE'


def idp_workflow(idp_wf_config: IdpWorkflowConfig) -> Flow:

    bootstrap_submission = idp_blocks.SubmissionBootstrapBlock(
        reference_name='submission_bootstrap',
    )

    case_collation_task = idp_blocks.MachineCollationBlock(
        reference_name='machine_collation',
        submission=bootstrap_submission.output('submission'),
        cases=bootstrap_submission.output('api_params.cases'),
    )

    machine_classification = idp_blocks.MachineClassificationBlock(
        reference_name='machine_classification',
        submission=case_collation_task.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
        rotation_correction_enabled=idp_wf_config.rotation_correction_enabled,
        mobile_processing_enabled=False,
    )

    manual_classification = idp_blocks.ManualClassificationBlock(
        reference_name='manual_classification',
        submission=machine_classification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    machine_identification = idp_blocks.MachineIdentificationBlock(
        reference_name='machine_identification',
        submission=manual_classification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    manual_identification = idp_blocks.ManualIdentificationBlock(
        reference_name='manual_identification',
        submission=machine_identification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    machine_transcription = idp_blocks.MachineTranscriptionBlock(
        reference_name='machine_transcription',
        submission=manual_identification.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
    )

    manual_transcription = idp_blocks.ManualTranscriptionBlock(
        reference_name='manual_transcription',
        submission=machine_transcription.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
        table_output_manual_review=(
            idp_wf_config.manual_transcription_config.table_output_manual_review
        ),
        supervision_transcription_masking=(
            idp_wf_config.manual_transcription_config.supervision_transcription_masking
        ),
        task_restrictions=idp_wf_config.manual_transcription_config.task_restrictions,
    )

    flexible_extraction = idp_blocks.FlexibleExtractionBlock(
        reference_name='flexible_extraction',
        submission=manual_transcription.output('submission'),
        api_params=bootstrap_submission.output('api_params'),
        task_restrictions=idp_wf_config.flexible_extraction_config.task_restrictions,
        supervision_transcription_masking=(
            idp_wf_config.flexible_extraction_config.supervision_transcription_masking
        ),
    )

    submission_complete = idp_blocks.SubmissionCompleteBlock(
        reference_name='complete_submission', submission=flexible_extraction.output('submission')
    )

    outputs = idp_blocks.IDPOutputsBlock(
        inputs={'submission': bootstrap_submission.output('submission')}
    )

    inputs = get_idp_wf_inputs(idp_wf_config)

    return Flow(
        uuid=UUID(IDP_UUID),
        owner_email='flows.sdk@hyperscience.com',
        title='Document Processing V35',
        description='Document Processing V35',
        manifest=idp_values.IDPManifest(flow_identifier=IDP_TASK_NAME),
        output={'submission': submission_complete.output()},
        triggers=idp_values.IDPTriggers(),
        input=inputs,
        blocks=[
            bootstrap_submission,
            case_collation_task,
            machine_classification,
            manual_classification,
            machine_identification,
            manual_identification,
            machine_transcription,
            manual_transcription,
            flexible_extraction,
            submission_complete,
            outputs,
        ],
    )


def entry_point_idp_flow() -> Flow:
    idp_wf_config = get_idp_wf_config()
    return idp_workflow(idp_wf_config)


if __name__ == '__main__':
    export_flow(flow=entry_point_idp_flow())


Full Page Transcription V35

This example transcribes text from documents.

Download idp_fpt_flow_v35.zip

Full Page Transcription V35 flow
from uuid import UUID

from flows_sdk.flows import Flow, Manifest
from flows_sdk.implementations.idp_v35.idp_blocks import (
    IDPFullPageTranscriptionBlock,
    IDPImageCorrectionBlock,
    IDPOutputsBlock,
    SubmissionBootstrapBlock,
    SubmissionCompleteBlock,
)
from flows_sdk.implementations.idp_v35.idp_values import IDPTriggers
from flows_sdk.package_utils import export_flow

IDENTIFIER = 'IDP_FULL_PAGE_TRANSCRIPTION_V35'
FLOW_UUID = '9f27b706-e063-473e-8eea-43896d1e6214'


def idp_fpt_workflow() -> Flow:

    submission_bootstrap = SubmissionBootstrapBlock(reference_name='submission_bootstrap')

    image_correction = IDPImageCorrectionBlock(
        reference_name='image_correction', submission=submission_bootstrap.output('submission')
    )

    full_page_transcription = IDPFullPageTranscriptionBlock(
        reference_name='full_page_transcription', submission=image_correction.output('submission')
    )

    # Submission complete block finalizes submission processing and updates reporting data
    # Every flow needs a complete block because it initiates Quality Assurance tasks and
    # changes the submission's status to "Complete"
    # In this example, submission complete block receives the submission object from
    # marked_as_complete custom code block
    submission_complete = SubmissionCompleteBlock(
        reference_name='complete_submission',
        payload=full_page_transcription.output('submission'),
        submission=full_page_transcription.output('submission'),
        nlc_qa_sampling_ratio=0,
        field_id_qa_enabled=False,
        field_id_qa_sampling_ratio=0,
        table_id_qa_enabled=False,
        table_id_qa_sampling_ratio=0,
        transcription_qa_enabled=False,
        transcription_qa_sampling_ratio=0,
        table_cell_transcription_qa_enabled=False,
        table_cell_transcription_qa_sample_rate=0,
    )

    outputs = IDPOutputsBlock(
        inputs={'submission': submission_bootstrap.output('submission')}, blocks=[]
    )

    triggers = IDPTriggers(blocks=[])

    return Flow(
        uuid=UUID(FLOW_UUID),
        owner_email='flows.sdk@hyperscience.com',
        title='Full Page Transcription V35',
        manifest=Manifest(identifier=IDENTIFIER, input=[]),
        triggers=triggers,
        blocks=[
            submission_bootstrap,
            image_correction,
            full_page_transcription,
            submission_complete,
            outputs,
        ],
        description='IDP Full Page Transcription V35',
        output={'submission': submission_complete.output()},
    )


if __name__ == '__main__':
    export_flow(flow=idp_fpt_workflow())


V34

Name

Description

Release

Sample Docs

Download

IDP Starter V34

Basic document processing flow. Package includes release and sample documents for processing Form W-9s.

../_images/checkmark.png ../_images/checkmark.png

idp_flow_v34.zip

IDP Custom Supervision V34

Sends all fields and tables from the submission to custom supervision. It will automatically collate all classified submission documents into a newly generated case if no case ID was submitted.

../_images/checkmark.png ../_images/checkmark.png

idp_custom_supervision_v34.zip

IDP Full Page Transcription V34

Transcribes text from documents.

idp_fpt_flow_v34.zip

IDP Starter V34

This example is a snapshot of our V34 Intelligent Document Processing (IDP) flow. It uses both Machine and Manual blocks to:

  1. Classify and collate pages

  2. Identify and transcribe text

  3. Additional manual transcription for any fields marked for review

  4. Output results to a downstream system

Download idp_flow_v34.zip

IDP Starter V34 flow
from uuid import UUID

from flows_sdk.flows import Flow
from flows_sdk.implementations.idp_v34.idp_blocks import (
    FlexibleExtractionBlock,
    IDPOutputsBlock,
    MachineClassificationBlock,
    MachineCollationBlock,
    MachineIdentificationBlock,
    MachineTranscriptionBlock,
    ManualClassificationBlock,
    ManualIdentificationBlock,
    ManualTranscriptionBlock,
    SubmissionBootstrapBlock,
    SubmissionCompleteBlock,
)
from flows_sdk.implementations.idp_v34.idp_values import (
    IDPManifest,
    IDPTriggers,
    get_idp_wf_config,
    get_idp_wf_inputs,
)
from flows_sdk.package_utils import export_flow


def entry_point_flow() -> Flow:
    return idp_workflow()


def idp_workflow() -> Flow:
    idp_wf_config = get_idp_wf_config()

    # The idp flow basically processes, modifies and propagates the submission object from
    # block to block
    # Each block's processing result is usually included in the submission object

    # Submission bootstrap block initializes the submission object and prepares external images
    # or other submission data if needed
    submission_bootstrap = SubmissionBootstrapBlock(reference_name='submission_bootstrap')

    # Case collation block groups files, documents and pages (from the submission) into cases
    # In this example, case collation block receives the submission object and the cases
    # information from submission bootstrap block
    machine_collation = MachineCollationBlock(
        reference_name='machine_collation',
        submission=submission_bootstrap.output('submission'),
        cases=submission_bootstrap.output('api_params.cases'),
    )

    # Machine classification block automatically matches documents to structured, semi-structured
    # or additional layouts
    # In this example, machine classification block receives the submission object from
    # case collation block
    machine_classification = MachineClassificationBlock(
        reference_name='machine_classification',
        submission=machine_collation.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        rotation_correction_enabled=idp_wf_config.rotation_correction_enabled,
    )

    # Manual classification block allows keyers to manually match submissions to their layouts.
    # Keyers may perform manual classification if machine classification cannot automatically
    # match a submission to a layout with high confidence
    # In this example, manual classification block receives the submission object from machine
    # classification block
    manual_classification = ManualClassificationBlock(
        reference_name='manual_classification',
        submission=machine_classification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
    )

    # Machine identification automatically identify fields and tables in the submission
    # In this example, machine identification block receives the submission object from manual
    # classification
    machine_identification = MachineIdentificationBlock(
        reference_name='machine_identification',
        submission=manual_classification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
    )

    # Manual identification allows keyers to complete field identification or table identification
    # tasks, where they draw bounding boxes around the contents of certain fields, table columns
    # or table rows. This identification process ensures that the system will be able to
    # transcribe the correct content in the upcoming transcription process
    # In this example, manual identification block receives the submission object from machine
    # identification
    manual_identification = ManualIdentificationBlock(
        reference_name='manual_identification',
        submission=machine_identification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        task_restrictions=idp_wf_config.manual_identification_config.task_restrictions,
    )

    # Machine transcription automatically transcribes the content of your submission
    # In this example, machine identification block receives the submission object from manual
    # identification
    machine_transcription = MachineTranscriptionBlock(
        reference_name='machine_transcription',
        submission=manual_identification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
    )

    # Manual transcription lets your keyers manually enter the text found in fields or tables
    # that could not be automatically transcribed
    # In this example, manual transcription block receives the submission object from machine
    # transcription block
    manual_transcription = ManualTranscriptionBlock(
        reference_name='manual_transcription',
        submission=machine_transcription.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        supervision_transcription_masking=(
            idp_wf_config.manual_transcription_config.supervision_transcription_masking
        ),
        table_output_manual_review=(
            idp_wf_config.manual_transcription_config.table_output_manual_review
        ),
        task_restrictions=idp_wf_config.manual_transcription_config.task_restrictions,
    )

    # Flexible extraction manually transcribes fields marked for review
    # In this example, flexible extraction block receives the submission object from manual
    # transcription block
    flexible_extraction = FlexibleExtractionBlock(
        reference_name='flexible_extraction',
        submission=manual_transcription.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        supervision_transcription_masking=(
            idp_wf_config.flexible_extraction_config.supervision_transcription_masking
        ),
        task_restrictions=idp_wf_config.flexible_extraction_config.task_restrictions,
    )

    # Submission complete block finalizes submission processing and updates reporting data
    # Every flow needs a complete block because it initiates Quality Assurance tasks and
    # changes the submission's status to "Complete"
    # In this example, submission complete block receives the submission object from
    # marked_as_complete custom code block
    submission_complete = SubmissionCompleteBlock(
        reference_name='complete_submission', submission=flexible_extraction.output('submission')
    )

    # Output block allows users to send data extracted by this idp flow to other systems
    # for downstream processing
    # In this example, this is an empty output block that does not do anything by default
    outputs = IDPOutputsBlock(inputs={'submission': submission_bootstrap.output('submission')})

    return Flow(
        # Flows should have a deterministic UUID ensuring cross-system consistency
        uuid=UUID('0c2ee4ee-0fcf-4d9b-b57a-d90519d2a002'),
        owner_email='flows.sdk@hyperscience.com',
        title='IDP Starter Example (V34)',
        # Flow identifiers are globally unique
        manifest=IDPManifest(flow_identifier='IDP_V34_FLOW_EXAMPLE'),
        # It is important to include all blocks that are instantiated here in the blocks
        # field and make sure they follow the order of the flow. For example, if machine
        # classification depends on the output of case collation, then machine_collation must
        # come before machine_classification in this blocks array
        blocks=[
            submission_bootstrap,
            machine_collation,
            machine_classification,
            manual_classification,
            machine_identification,
            manual_identification,
            machine_transcription,
            manual_transcription,
            flexible_extraction,
            submission_complete,
            outputs,
        ],
        input=get_idp_wf_inputs(idp_wf_config),
        description='A basic approach to extracting information from documents.',
        triggers=IDPTriggers(),
        output={'submission': submission_complete.output()},
    )


if __name__ == '__main__':
    export_flow(flow=entry_point_flow())


IDP Custom Supervision V34

This example sends all fields and tables in the submission to Custom Supervision. It will automatically collate all classified submission documents into a newly generated case if no case ID was submitted.

Download idp_custom_supervision_v34.zip

IDP Custom Supervision V34 flow
from typing import Any, List
from uuid import UUID

from flows_sdk.blocks import CodeBlock, Routing
from flows_sdk.flows import Flow
from flows_sdk.implementations.idp_v34.idp_blocks import (
    IdpCustomSupervisionBlock,
    IDPOutputsBlock,
    MachineClassificationBlock,
    MachineCollationBlock,
    MachineIdentificationBlock,
    MachineTranscriptionBlock,
    ManualClassificationBlock,
    ManualIdentificationBlock,
    ManualTranscriptionBlock,
    SubmissionBootstrapBlock,
    SubmissionCompleteBlock,
)
from flows_sdk.implementations.idp_v34.idp_values import (
    IDPManifest,
    IDPTriggers,
    get_idp_wf_config,
    get_idp_wf_inputs,
)
from flows_sdk.package_utils import export_flow


def entry_point_flow() -> Flow:
    return idp_workflow()


def _collate_classified_documents(manual_transcription: Any) -> List:
    """
    Helper method responsible for an optional flow that executes the following logic:
    - If any documents in the submission already have a case ID, then no operation is necessary
    - If none of the documents in the submission are assigned to a case,
        then create a new case containing all the CLASSIFIED documents in the submission.
        Unassigned pages should not receive case IDs
    """

    # Check the submission object's case field to see if a case has been created
    # thus far in the flow. If not, we should collate all the classified documents
    # in the submission. Otherwise, we can skip this operation.
    should_add_docs_to_case = CodeBlock(
        reference_name='should_add_docs_to_case',
        title='Should Add Classified Documents to a Case',
        description='If there are no cases assigned in the submission, group all classified '
        'documents into a newly created case for Custom Supervision',
        code=lambda cases: {'add_docs': 'true' if not cases else 'false'},
        code_input={'cases': manual_transcription.output('submission.cases')},
    )

    # Here we define the custom method used to grab all documents in the submission.
    def _grab_classified_doc_ids_fn(submission: Any) -> Any:
        doc_list = []
        for doc in submission.get('documents', []):
            doc_list.append(doc['id'])

        return doc_list

    # Custom Code Block to parse the submission object to find all documents. Classified
    # Documents automatically get categorized in this field.
    get_classified_doc_ids = CodeBlock(
        reference_name='get_classified_doc_ids',
        code=_grab_classified_doc_ids_fn,
        code_input={'submission': manual_transcription.output('submission')},
        title='Get All Document IDs',
        description='Grab all the Document IDs present in this submission '
        'to collate into a new case for Custom Supervision',
    )

    # We create and format the appropriate payload for our Machine Collation block to ingest.
    # Note that we are passing a null value for our external_case_id so that we can
    # automatically generate a new case ID.
    format_case_input = CodeBlock(
        reference_name='format_case_input',
        title='Format Classified Documents for Machine Collation',
        description='Creates the payload for Machine Collation',
        code=lambda docs: [{'external_case_id': None, 'documents': docs}],
        code_input={'docs': get_classified_doc_ids.output()},
    )

    # The Machine Collation Block creates the case and collates the classified documents
    # into the newly created case.
    group_into_cases = MachineCollationBlock(
        reference_name='pre_custom_sv_case_collation',
        submission=manual_transcription.output('submission'),
        cases=format_case_input.output(),
    )

    # The Routing block that is responsible for conditionally executing this flow.
    # Subsequent blocks are not reliant on the output of this Routing block.
    group_classified_doc_to_case = Routing(
        reference_name='group_classified_doc_to_case',
        decision=should_add_docs_to_case.output('add_docs'),
        branches=[
            Routing.Branch(
                case='true',
                blocks=[get_classified_doc_ids, format_case_input, group_into_cases],
                label='Collate Classified Documents',
            )
        ],
        default_branch=Routing.DefaultBranch(
            blocks=[
                CodeBlock(
                    reference_name='no_op_case_collation',
                    title='No Operation',
                    description='Pass the existing submission along without any changes',
                    code=lambda submission: {'submission': submission},
                    code_input={'submission': manual_transcription.output('submission')},
                )
            ]
        ),
    )

    return [should_add_docs_to_case, group_classified_doc_to_case]


def idp_workflow() -> Flow:
    idp_wf_config = get_idp_wf_config()

    # The idp flow basically processes, modifies and propagates the submission object from
    # block to block
    # Each block's processing result is usually included in the submission object

    # Submission bootstrap block initializes the submission object and prepares external images
    # or other submission data if needed
    submission_bootstrap = SubmissionBootstrapBlock(reference_name='submission_bootstrap')

    # Case collation block groups files, documents and pages (from the submission) into cases
    # In this example, case collation block receives the submission object and the cases
    # information from submission bootstrap block
    machine_collation = MachineCollationBlock(
        reference_name='machine_collation',
        submission=submission_bootstrap.output('submission'),
        cases=submission_bootstrap.output('api_params.cases'),
    )

    # Machine classification block automatically matches documents to structured, semi-structured
    # or additional layouts
    # In this example, machine classification block receives the submission object from
    # case collation block
    machine_classification = MachineClassificationBlock(
        reference_name='machine_classification',
        submission=machine_collation.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        rotation_correction_enabled=idp_wf_config.rotation_correction_enabled,
    )

    # Manual classification block allows keyers to manually match submissions to their layouts.
    # Keyers may perform manual classification if machine classification cannot automatically
    # match a submission to a layout with high confidence
    # In this example, manual classification block receives the submission object from machine
    # classification block
    manual_classification = ManualClassificationBlock(
        reference_name='manual_classification',
        submission=machine_classification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
    )

    # Machine identification automatically identify fields and tables in the submission
    # In this example, machine identification block receives the submission object from manual
    # classification
    machine_identification = MachineIdentificationBlock(
        reference_name='machine_identification',
        submission=manual_classification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
    )

    # Manual identification allows keyers to complete field identification or table identification
    # tasks, where they draw bounding boxes around the contents of certain fields, table columns
    # or table rows. This identification process ensures that the system will be able to
    # transcribe the correct content in the upcoming transcription process
    # In this example, manual identification block receives the submission object from machine
    # identification
    manual_identification = ManualIdentificationBlock(
        reference_name='manual_identification',
        submission=machine_identification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        task_restrictions=idp_wf_config.manual_identification_config.task_restrictions,
    )

    # Machine transcription automatically transcribes the content of your submission
    # In this example, machine identification block receives the submission object from manual
    # identification
    machine_transcription = MachineTranscriptionBlock(
        reference_name='machine_transcription',
        submission=manual_identification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
    )

    # Manual transcription lets your keyers manually enter the text found in fields or tables
    # that could not be automatically transcribed
    # In this example, manual transcription block receives the submission object from machine
    # transcription block
    manual_transcription = ManualTranscriptionBlock(
        reference_name='manual_transcription',
        submission=machine_transcription.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        supervision_transcription_masking=(
            idp_wf_config.manual_transcription_config.supervision_transcription_masking
        ),
        table_output_manual_review=(
            idp_wf_config.manual_transcription_config.table_output_manual_review
        ),
        task_restrictions=idp_wf_config.manual_transcription_config.task_restrictions,
    )

    # Here we define a function for a custom code block that gets all fields and tables from a
    # submission to display them in custom supervision. It makes sure they appear only when an
    # appropriate layout has been selected by using the dependencies settings.
    # We also create a decision for the first cell in each table.
    def _get_transcriptions_for_custom_sv_fn(submission: Any) -> Any:
        transcription_entities = []
        table_cell_decisions = []
        visited = set()
        for doc in submission.get('documents', []):
            fields = sorted(doc.get('document_fields', []), key=lambda f: f['id'])
            for field in fields:
                if field['layout_field_uuid'] in visited:
                    continue
                visited.add(field['layout_field_uuid'])
                transcription_entities.append(
                    {
                        'name': f"{field['field_name']}_{field['layout_field_uuid']}",
                        'type': 'transcription',
                        'data_identifier': field['layout_field_uuid'],
                        'ui': {'hidden': True},
                        'dependencies': [
                            {
                                'condition': {
                                    'properties': {'layoutId': {'const': doc['layout_uuid']}}
                                },
                                'override': {'ui': {'hidden': False}},
                            }
                        ],
                    }
                )
            for table in doc.get('tables', []):
                if table['layout_table_uuid'] in visited:
                    continue
                visited.add(table['layout_table_uuid'])
                transcription_entities.append(
                    {
                        'name': table['layout_table_uuid'],
                        'type': 'table_transcription',
                        'data_identifier': table['layout_table_uuid'],
                        'ui': {'hidden': True},
                        'dependencies': [
                            {
                                'condition': {
                                    'properties': {'layoutId': {'const': doc['layout_uuid']}}
                                },
                                'override': {'ui': {'hidden': False}},
                            }
                        ],
                    },
                )
                try:
                    table_cell_decisions.append(
                        {
                            'name': f'table_cell_decision_{table["layout_table_uuid"]}',
                            'type': 'decision',
                            'title': 'Table Cell Decision',
                            'ui': {'hidden': True},
                            'relation': {
                                'type': 'table_cell',
                                'match': table['columns'][0]['cells'][0]['uuid'],
                            },
                            'dependencies': [
                                {
                                    'condition': {
                                        'properties': {'layoutId': {'const': doc['layout_uuid']}}
                                    },
                                    'override': {'ui': {'hidden': False}},
                                }
                            ],
                            'schema': {
                                'oneOf': [
                                    {'const': 'accept', 'title': 'Accept Table Cell'},
                                    {'const': 'reject', 'title': 'Reject Table Cell'},
                                ]
                            },
                        },
                    )
                except (IndexError, KeyError):
                    pass
        return transcription_entities, table_cell_decisions

    # Create our Custom Code block using our function from above.
    get_transcriptions_for_custom_sv = CodeBlock(
        reference_name='get_transcriptions_for_custom_sv',
        code=_get_transcriptions_for_custom_sv_fn,
        code_input={'submission': manual_transcription.output('submission')},
        title='Get All Transcriptions',
        description='Find all transcriptions in this submission to include in Custom Supervision',
    )

    # Using the output of our Custom Code block, we can construct our three column template
    # to define our Custom Supervision task.
    format_sv_template = CodeBlock(
        title='Format Custom Supervision Template',
        description='Creates the supervision template to define our Custom Supervision task',
        reference_name='format_sv_template',
        code=lambda transcriptions: [
            {
                'name': 'three_column_template',
                'version': '1.0',
                'thumbnail': {'group_by_document': True, 'group_by_case': True},
                'action': [
                    {
                        'name': 'Transcriptions',
                        'display': 'All Identified Fields and Tables',
                        'input': transcriptions[0],
                    },
                    {
                        'name': 'Decision and Case',
                        'display': 'Make Decision - Add Case',
                        'input': [
                            *transcriptions[1],
                            {
                                'name': 'document_decision',
                                'type': 'decision',
                                'title': 'Document Decision',
                                'relation': {'type': 'document'},
                                'schema': {
                                    'oneOf': [
                                        {'const': 'accept', 'title': 'Accept Document'},
                                        {'const': 'reject', 'title': 'Reject Document'},
                                    ]
                                },
                            },
                            {'name': 'assign_to_case', 'type': 'case_dropdown', 'title': ''},
                        ],
                    },
                ],
            }
        ],
        code_input={
            'transcriptions': get_transcriptions_for_custom_sv.output(),
        },
    )

    # Create our Custom Supervision task using the template we constructed via Custom Code blocks.
    # This task will have all fields from the submission so that the user can edit them, as well
    # as a document level decision ("accept" or "reject" the document). It will also have the
    # option to add a document to a case.
    idp_custom_supervision = IdpCustomSupervisionBlock(
        reference_name='idp_custom_supervision',
        submission=manual_transcription.output('submission'),
        task_purpose='example_custom_supervision',
        supervision_template=format_sv_template.output(),
    )

    # Case collation block groups files, documents and pages (from the submission) into cases
    # In this example, case collation block receives the submission object and the cases
    # information from the custom supervision block. This block is required after Custom
    # Supervision in order to propagate case ID changes to the IDP database.
    custom_supervision_collation = MachineCollationBlock(
        reference_name='custom_supervision_collation',
        submission=idp_custom_supervision.output('submission'),
        cases=idp_custom_supervision.output('cases'),
        remove_from_cases=idp_custom_supervision.output('remove_from_cases'),
    )

    # Submission complete block finalizes submission processing and updates reporting data
    # Every flow needs a complete block because it initiates Quality Assurance tasks and
    # changes the submission's status to "Complete"
    # In this example, submission complete block receives the submission object from
    # marked_as_complete custom code block
    submission_complete = SubmissionCompleteBlock(
        reference_name='complete_submission',
        submission=custom_supervision_collation.output('submission'),
    )

    # Output block allows users to send data extracted by this idp flow to other systems
    # for downstream processing
    # In this example, this is an empty output block that does not do anything by default
    outputs = IDPOutputsBlock(inputs={'submission': submission_bootstrap.output('submission')})

    return Flow(
        # Flows should have a deterministic UUID ensuring cross-system consistency
        uuid=UUID('4cc70d7f-43c1-43ff-a7c7-708b43610558'),
        owner_email='flows.sdk@hyperscience.com',
        title='IDP Custom Supervision Block Flow Example (V34)',
        # Flow identifiers are globally unique
        manifest=IDPManifest(flow_identifier='IDP_CUSTOM_SUPERVISION_V34_EXAMPLE'),
        # It is important to include all blocks that are instantiated here in the blocks
        # field and make sure they follow the order of the flow. For example, if machine
        # classification depends on the output of case collation, then machine_collation must
        # come before machine_classification in this blocks array
        blocks=[
            submission_bootstrap,
            machine_collation,
            machine_classification,
            manual_classification,
            machine_identification,
            manual_identification,
            machine_transcription,
            manual_transcription,
            *_collate_classified_documents(manual_transcription),
            get_transcriptions_for_custom_sv,
            format_sv_template,
            idp_custom_supervision,
            custom_supervision_collation,
            submission_complete,
            outputs,
        ],
        input=get_idp_wf_inputs(idp_wf_config),
        description='IDP Custom Supervision - All Fields (V34)',
        triggers=IDPTriggers(),
        output={'submission': submission_complete.output()},
    )


if __name__ == '__main__':
    export_flow(flow=entry_point_flow())


IDP Full Page Transcription V34

This example transcribes text from documents.

Download idp_fpt_flow_v34.zip

IDP Full Page Transcription V34 flow
from uuid import UUID

from flows_sdk.flows import Flow, Manifest
from flows_sdk.implementations.idp_v34.idp_blocks import (
    IDPFullPageTranscriptionBlock,
    IDPImageCorrectionBlock,
    IDPOutputsBlock,
    SubmissionBootstrapBlock,
    SubmissionCompleteBlock,
)
from flows_sdk.implementations.idp_v34.idp_values import IDPTriggers
from flows_sdk.package_utils import export_flow


def entry_point_flow() -> Flow:
    return idp_fpt_workflow()


def idp_fpt_workflow() -> Flow:
    # The idp flow basically processes, modifies and propagates the submission object from
    # block to block
    # Each block's processing result is usually included in the submission object

    # Submission bootstrap block initializes the submission object and prepares external images
    # or other submission data if needed
    submission_bootstrap = SubmissionBootstrapBlock(reference_name='submission_bootstrap')

    image_correction = IDPImageCorrectionBlock(
        reference_name='image_correction', submission=submission_bootstrap.output('submission')
    )

    full_page_transcription = IDPFullPageTranscriptionBlock(
        reference_name='full_page_transcription', submission=image_correction.output('submission')
    )

    # Submission complete block finalizes submission processing and updates reporting data
    # Every flow needs a complete block because it initiates Quality Assurance tasks and
    # changes the submission's status to "Complete"
    # In this example, submission complete block receives the submission object from
    # marked_as_complete custom code block
    submission_complete = SubmissionCompleteBlock(
        reference_name='complete_submission',
        payload=full_page_transcription.output('submission'),
        submission=full_page_transcription.output('submission'),
        nlc_qa_sampling_ratio=0,
        field_id_qa_enabled=False,
        field_id_qa_sampling_ratio=0,
        table_id_qa_enabled=False,
        table_id_qa_sampling_ratio=0,
        transcription_qa_enabled=False,
        transcription_qa_sampling_ratio=0,
        table_cell_transcription_qa_enabled=False,
        table_cell_transcription_qa_sample_rate=0,
    )

    # Output block allows users to send data extracted by this idp flow to other systems
    # for downstream processing
    # In this example, no output block is instantiated (blocks=[])
    # Setting up output blocks via UI and leaving this empty is recommended
    outputs = IDPOutputsBlock(
        inputs={'submission': submission_bootstrap.output('submission')}, blocks=[]
    )

    # Trigger block allows users to send data to idp flow via sources other than the User Interface
    # In this example, no trigger block is instantiated (blocks=[])
    # Setting up trigger blocks via UI and leaving this empty is recommended
    triggers = IDPTriggers(blocks=[])

    return Flow(
        # Flows should have a deterministic UUID ensuring cross-system consistency
        uuid=UUID('0dd837ae-44da-425a-b4be-9ffa3fc40eab'),
        owner_email='flows.sdk@hyperscience.com',
        title='IDP Full Page Transcription (V34)',
        # Flow identifiers are globally unique
        manifest=Manifest(identifier='IDP_FULL_PAGE_TRANSCRIPTION_V34', input=[]),
        triggers=triggers,
        # It is important to include all blocks that are instantiated here in the blocks
        # field and make sure they follow the order of the flow. For example, if machine
        # classification depends on the output of case collation, then case_collation must
        # come before machine_classification in this blocks array
        blocks=[
            submission_bootstrap,
            image_correction,
            full_page_transcription,
            submission_complete,
            outputs,
        ],
        description='IDP Full Page Transcription (V34)',
        output={'submission': submission_complete.output()},
    )


if __name__ == '__main__':
    export_flow(flow=entry_point_flow())


V33

Name

Description

Release

Sample Docs

Download

IDP Starter V33

Basic document processing flow. Package includes release and sample documents for processing Form W-9s.

../_images/checkmark.png ../_images/checkmark.png

idp_flow_v33.zip

IDP Custom Supervision V33

Sends all fields from the submission to custom supervision. It will automatically collate all classified submission documents into a newly generated case if no case ID was submitted.

../_images/checkmark.png ../_images/checkmark.png

idp_custom_supervision_v33.zip

IDP Starter V33

This example is a snapshot of our V33 Intelligent Document Processing (IDP) flow. It uses both Machine and Manual blocks to:

  1. Classify and collate pages

  2. Identify and transcribe text

  3. Additional manual transcription for any fields marked for review

  4. Output results to a downstream system

Download idp_flow_v33.zip

IDP Starter V33 flow
from uuid import UUID

from flows_sdk.flows import Flow
from flows_sdk.implementations.idp_v33.idp_blocks import (
    FlexibleExtractionBlock,
    IDPOutputsBlock,
    MachineClassificationBlock,
    MachineCollationBlock,
    MachineIdentificationBlock,
    MachineTranscriptionBlock,
    ManualClassificationBlock,
    ManualIdentificationBlock,
    ManualTranscriptionBlock,
    SubmissionBootstrapBlock,
    SubmissionCompleteBlock,
)
from flows_sdk.implementations.idp_v33.idp_values import (
    IDPManifest,
    IDPTriggers,
    get_idp_wf_config,
    get_idp_wf_inputs,
)
from flows_sdk.package_utils import export_flow


def entry_point_flow() -> Flow:
    return idp_workflow()


def idp_workflow() -> Flow:
    idp_wf_config = get_idp_wf_config()

    # The idp flow basically processes, modifies and propagates the submission object from
    # block to block
    # Each block's processing result is usually included in the submission object

    # Submission bootstrap block initializes the submission object and prepares external images
    # or other submission data if needed
    submission_bootstrap = SubmissionBootstrapBlock(reference_name='submission_bootstrap')

    # Case collation block groups files, documents and pages (from the submission) into cases
    # In this example, case collation block receives the submission object and the cases
    # information from submission bootstrap block
    machine_collation = MachineCollationBlock(
        reference_name='machine_collation',
        submission=submission_bootstrap.output('submission'),
        cases=submission_bootstrap.output('api_params.cases'),
    )

    # Machine classification block automatically matches documents to structured, semi-structured
    # or additional layouts
    # In this example, machine classification block receives the submission object from
    # case collation block
    machine_classification = MachineClassificationBlock(
        reference_name='machine_classification',
        submission=machine_collation.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        rotation_correction_enabled=idp_wf_config.rotation_correction_enabled,
    )

    # Manual classification block allows keyers to manually match submissions to their layouts.
    # Keyers may perform manual classification if machine classification cannot automatically
    # match a submission to a layout with high confidence
    # In this example, manual classification block receives the submission object from machine
    # classification block
    manual_classification = ManualClassificationBlock(
        reference_name='manual_classification',
        submission=machine_classification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
    )

    # Machine identification automatically identify fields and tables in the submission
    # In this example, machine identification block receives the submission object from manual
    # classification
    machine_identification = MachineIdentificationBlock(
        reference_name='machine_identification',
        submission=manual_classification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
    )

    # Manual identification allows keyers to complete field identification or table identification
    # tasks, where they draw bounding boxes around the contents of certain fields, table columns
    # or table rows. This identification process ensures that the system will be able to
    # transcribe the correct content in the upcoming transcription process
    # In this example, manual identification block receives the submission object from machine
    # identification
    manual_identification = ManualIdentificationBlock(
        reference_name='manual_identification',
        submission=machine_identification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        task_restrictions=idp_wf_config.manual_identification_config.task_restrictions,
    )

    # Machine transcription automatically transcribes the content of your submission
    # In this example, machine identification block receives the submission object from manual
    # identification
    machine_transcription = MachineTranscriptionBlock(
        reference_name='machine_transcription',
        submission=manual_identification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
    )

    # Manual transcription lets your keyers manually enter the text found in fields or tables
    # that could not be automatically transcribed
    # In this example, manual transcription block receives the submission object from machine
    # transcription block
    manual_transcription = ManualTranscriptionBlock(
        reference_name='manual_transcription',
        submission=machine_transcription.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        supervision_transcription_masking=(
            idp_wf_config.manual_transcription_config.supervision_transcription_masking
        ),
        table_output_manual_review=(
            idp_wf_config.manual_transcription_config.table_output_manual_review
        ),
        task_restrictions=idp_wf_config.manual_transcription_config.task_restrictions,
    )

    # Flexible extraction manually transcribes fields marked for review
    # In this example, flexible extraction block receives the submission object from manual
    # transcription block
    flexible_extraction = FlexibleExtractionBlock(
        reference_name='flexible_extraction',
        submission=manual_transcription.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        supervision_transcription_masking=(
            idp_wf_config.flexible_extraction_config.supervision_transcription_masking
        ),
        task_restrictions=idp_wf_config.flexible_extraction_config.task_restrictions,
    )

    # Submission complete block finalizes submission processing and updates reporting data
    # Every flow needs a complete block because it initiates Quality Assurance tasks and
    # changes the submission's status to "Complete"
    # In this example, submission complete block receives the submission object from
    # marked_as_complete custom code block
    submission_complete = SubmissionCompleteBlock(
        reference_name='complete_submission', submission=flexible_extraction.output('submission')
    )

    # Output block allows users to send data extracted by this idp flow to other systems
    # for downstream processing
    # In this example, this is an empty output block that does not do anything by default
    outputs = IDPOutputsBlock(inputs={'submission': submission_bootstrap.output('submission')})

    return Flow(
        # Flows should have a deterministic UUID ensuring cross-system consistency
        uuid=UUID('d0ac7102-37de-4445-88ba-fd0f6737ce32'),
        owner_email='flows.sdk@hyperscience.com',
        title='IDP Starter Example (V33)',
        # Flow identifiers are globally unique
        manifest=IDPManifest(flow_identifier='IDP_V33_FLOW_EXAMPLE'),
        # It is important to include all blocks that are instantiated here in the blocks
        # field and make sure they follow the order of the flow. For example, if machine
        # classification depends on the output of case collation, then machine_collation must
        # come before machine_classification in this blocks array
        blocks=[
            submission_bootstrap,
            machine_collation,
            machine_classification,
            manual_classification,
            machine_identification,
            manual_identification,
            machine_transcription,
            manual_transcription,
            flexible_extraction,
            submission_complete,
            outputs,
        ],
        input=get_idp_wf_inputs(idp_wf_config),
        description='A basic approach to extracting information from documents.',
        triggers=IDPTriggers(),
        output={'submission': submission_complete.output()},
    )


if __name__ == '__main__':
    export_flow(flow=entry_point_flow())


IDP Custom Supervision V33

This example sends all fields in the submission to Custom Supervision. It will automatically collate all classified submission documents into a newly generated case if no case ID was submitted.

Download idp_custom_supervision_v33.zip

IDP Custom Supervision V33 flow
from typing import Any, List
from uuid import UUID

from flows_sdk.blocks import CodeBlock, Routing
from flows_sdk.flows import Flow
from flows_sdk.implementations.idp_v33.idp_blocks import (
    IdpCustomSupervisionBlock,
    IDPOutputsBlock,
    MachineClassificationBlock,
    MachineCollationBlock,
    MachineIdentificationBlock,
    MachineTranscriptionBlock,
    ManualClassificationBlock,
    ManualIdentificationBlock,
    ManualTranscriptionBlock,
    SubmissionBootstrapBlock,
    SubmissionCompleteBlock,
)
from flows_sdk.implementations.idp_v33.idp_values import (
    IDPManifest,
    IDPTriggers,
    get_idp_wf_config,
    get_idp_wf_inputs,
)
from flows_sdk.package_utils import export_flow


def entry_point_flow() -> Flow:
    return idp_workflow()


def _collate_classified_documents(manual_transcription: Any) -> List:
    """
    Helper method responsible for an optional flow that executes the following logic:
    - If any documents in the submission already have a case ID, then no operation is necessary
    - If none of the documents in the submission are assigned to a case, then create a new case
    containing all the CLASSIFIED documents in the submission.
    Unassigned pages should not receive case IDs
    """

    # Check the submission object's case field to see if a case has been created
    # thus far in the flow. If not, we should collate all the classified documents
    # in the submission. Otherwise, we can skip this operation.
    should_add_docs_to_case = CodeBlock(
        reference_name='should_add_docs_to_case',
        title='Should Add Classified Documents to a Case',
        description='If there are no cases assigned in the submission, group all classified '
        'documents into a newly created case for Custom Supervision',
        code=lambda cases: {'add_docs': 'true' if not cases else 'false'},
        code_input={'cases': manual_transcription.output('submission.cases')},
    )

    # Here we define the custom method used to grab all documents in the submission.
    def _grab_classified_doc_ids_fn(submission: Any) -> Any:
        doc_list = []
        for doc in submission.get('documents', []):
            doc_list.append(doc['id'])

        return doc_list

    # Custom Code Block to parse the submission object to find all documents. Classified
    # Documents automatically get categorized in this field.
    get_classified_doc_ids = CodeBlock(
        reference_name='get_classified_doc_ids',
        code=_grab_classified_doc_ids_fn,
        code_input={'submission': manual_transcription.output('submission')},
        title='Get All Document IDs',
        description='Grab all the Document IDs present in this submission '
        'to collate into a new case for Custom Supervision',
    )

    # We create and format the appropriate payload for our Machine Collation block to ingest.
    # Note that we are passing a null value for our external_case_id so that we can
    # automatically generate a new case ID.
    format_case_input = CodeBlock(
        reference_name='format_case_input',
        title='Format Classified Documents for Machine Collation',
        description='Creates the payload for Machine Collation',
        code=lambda docs: [{'external_case_id': None, 'documents': docs}],
        code_input={'docs': get_classified_doc_ids.output()},
    )

    # The Machine Collation Block creates the case and collates the classified documents
    # into the newly created case.
    group_into_cases = MachineCollationBlock(
        reference_name='pre_custom_sv_case_collation',
        submission=manual_transcription.output('submission'),
        cases=format_case_input.output(),
    )

    # The Routing block that is responsible for conditionally executing this flow.
    # Subsequent blocks are not reliant on the output of this Routing block.
    group_classified_doc_to_case = Routing(
        reference_name='group_classified_doc_to_case',
        decision=should_add_docs_to_case.output('add_docs'),
        branches=[
            Routing.Branch(
                case='true',
                blocks=[get_classified_doc_ids, format_case_input, group_into_cases],
                label='Collate Classified Documents',
            )
        ],
        default_branch=Routing.DefaultBranch(
            blocks=[
                CodeBlock(
                    reference_name='no_op_case_collation',
                    title='No Operation',
                    description='Pass the existing submission along without any changes',
                    code=lambda submission: {'submission': submission},
                    code_input={'submission': manual_transcription.output('submission')},
                )
            ]
        ),
    )

    return [should_add_docs_to_case, group_classified_doc_to_case]


def idp_workflow() -> Flow:
    idp_wf_config = get_idp_wf_config()

    # The idp flow basically processes, modifies and propagates the submission object from
    # block to block
    # Each block's processing result is usually included in the submission object

    # Submission bootstrap block initializes the submission object and prepares external images
    # or other submission data if needed
    submission_bootstrap = SubmissionBootstrapBlock(reference_name='submission_bootstrap')

    # Case collation block groups files, documents and pages (from the submission) into cases
    # In this example, case collation block receives the submission object and the cases
    # information from submission bootstrap block
    machine_collation = MachineCollationBlock(
        reference_name='machine_collation',
        submission=submission_bootstrap.output('submission'),
        cases=submission_bootstrap.output('api_params.cases'),
    )

    # Machine classification block automatically matches documents to structured, semi-structured
    # or additional layouts
    # In this example, machine classification block receives the submission object from
    # case collation block
    machine_classification = MachineClassificationBlock(
        reference_name='machine_classification',
        submission=machine_collation.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        rotation_correction_enabled=idp_wf_config.rotation_correction_enabled,
    )

    # Manual classification block allows keyers to manually match submissions to their layouts.
    # Keyers may perform manual classification if machine classification cannot automatically
    # match a submission to a layout with high confidence
    # In this example, manual classification block receives the submission object from machine
    # classification block
    manual_classification = ManualClassificationBlock(
        reference_name='manual_classification',
        submission=machine_classification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
    )

    # Machine identification automatically identify fields and tables in the submission
    # In this example, machine identification block receives the submission object from manual
    # classification
    machine_identification = MachineIdentificationBlock(
        reference_name='machine_identification',
        submission=manual_classification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
    )

    # Manual identification allows keyers to complete field identification or table identification
    # tasks, where they draw bounding boxes around the contents of certain fields, table columns
    # or table rows. This identification process ensures that the system will be able to
    # transcribe the correct content in the upcoming transcription process
    # In this example, manual identification block receives the submission object from machine
    # identification
    manual_identification = ManualIdentificationBlock(
        reference_name='manual_identification',
        submission=machine_identification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        task_restrictions=idp_wf_config.manual_identification_config.task_restrictions,
    )

    # Machine transcription automatically transcribes the content of your submission
    # In this example, machine identification block receives the submission object from manual
    # identification
    machine_transcription = MachineTranscriptionBlock(
        reference_name='machine_transcription',
        submission=manual_identification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
    )

    # Manual transcription lets your keyers manually enter the text found in fields or tables
    # that could not be automatically transcribed
    # In this example, manual transcription block receives the submission object from machine
    # transcription block
    manual_transcription = ManualTranscriptionBlock(
        reference_name='manual_transcription',
        submission=machine_transcription.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        supervision_transcription_masking=(
            idp_wf_config.manual_transcription_config.supervision_transcription_masking
        ),
        table_output_manual_review=(
            idp_wf_config.manual_transcription_config.table_output_manual_review
        ),
        task_restrictions=idp_wf_config.manual_transcription_config.task_restrictions,
    )

    # Here we define a function for a custom code block that gets all fields from a submission
    # to display them in custom supervision. It makes sure they appear only when an appropriate
    # layout has been selected by using the dependencies settings.
    def _get_fields_for_custom_sv_fn(submission: Any) -> Any:
        template_fields = []
        visited = set()
        for doc in submission.get('documents', []):
            fields = sorted(doc.get('document_fields', []), key=lambda f: f['id'])
            for field in fields:
                if field['layout_field_uuid'] in visited:
                    continue
                visited.add(field['layout_field_uuid'])
                template_fields.append(
                    {
                        'name': f"{field['field_name']}_{field['layout_field_uuid']}",
                        'type': 'transcription',
                        'layout_field_id': field['layout_field_uuid'],
                        'ui': {'hidden': True},
                        'dependencies': [
                            {
                                'condition': {
                                    'properties': {'layoutId': {'const': doc['layout_uuid']}}
                                },
                                'override': {'ui': {'hidden': False}},
                            }
                        ],
                    }
                )
        return template_fields

    # Create our Custom Code block using our function from above.
    get_fields_for_custom_sv = CodeBlock(
        reference_name='get_fields_for_custom_sv',
        code=_get_fields_for_custom_sv_fn,
        code_input={'submission': manual_transcription.output('submission')},
        title='Get All Fields',
        description='Find all fields present in this submission to include in Custom Supervision',
    )

    # Using the output of our Custom Code block, we can construct our three column template
    # to define our Custom Supervision task.
    format_sv_template = CodeBlock(
        title='Format Custom Supervision Template',
        description='Creates the supervision template to define our Custom Supervision task',
        reference_name='format_sv_template',
        code=lambda fields: [
            {
                'name': 'three_column_template',
                'version': '1.0',
                'thumbnail': {'group_by_document': True, 'group_by_case': True},
                'action': [
                    {
                        'name': 'Transcribe Fields',
                        'display': 'All Identified Fields',
                        'input': fields,
                    },
                    {
                        'name': 'Decision and Case',
                        'display': 'Make Decision - Add Case',
                        'input': [
                            {
                                'name': 'document_decision',
                                'type': 'decision',
                                'title': 'Document Decision',
                                'relation': {'type': 'document'},
                                'schema': {
                                    'oneOf': [
                                        {'const': 'accept', 'title': 'Accept Document'},
                                        {'const': 'reject', 'title': 'Reject Document'},
                                    ]
                                },
                            },
                            {'name': 'assign_to_case', 'type': 'case_dropdown', 'title': ''},
                        ],
                    },
                ],
            }
        ],
        code_input={'fields': get_fields_for_custom_sv.output()},
    )

    # Create our Custom Supervision task using the template we constructed via Custom Code blocks.
    # This task will have all fields from the submission so that the user can edit them, as well
    # as a document level decision ("accept" or "reject" the document). It will also have the
    # option to add a document to a case.
    idp_custom_supervision = IdpCustomSupervisionBlock(
        reference_name='idp_custom_supervision',
        submission=manual_transcription.output('submission'),
        task_purpose='example_custom_supervision',
        supervision_template=format_sv_template.output(),
    )

    # Case collation block groups files, documents and pages (from the submission) into cases
    # In this example, case collation block receives the submission object and the cases
    # information from the custom supervision block. This block is required after Custom
    # Supervision in order to propagate case ID changes to the IDP database.
    custom_supervision_collation = MachineCollationBlock(
        reference_name='custom_supervision_collation',
        submission=idp_custom_supervision.output('submission'),
        cases=idp_custom_supervision.output('cases'),
    )

    # Submission complete block finalizes submission processing and updates reporting data
    # Every flow needs a complete block because it initiates Quality Assurance tasks and
    # changes the submission's status to "Complete"
    # In this example, submission complete block receives the submission object from
    # marked_as_complete custom code block
    submission_complete = SubmissionCompleteBlock(
        reference_name='complete_submission',
        submission=custom_supervision_collation.output('submission'),
    )

    # Output block allows users to send data extracted by this idp flow to other systems
    # for downstream processing
    # In this example, this is an empty output block that does not do anything by default
    outputs = IDPOutputsBlock(inputs={'submission': submission_bootstrap.output('submission')})

    return Flow(
        # Flows should have a deterministic UUID ensuring cross-system consistency
        uuid=UUID('4992492c-7217-48e6-a863-b0bf15ef0c5d'),
        owner_email='flows.sdk@hyperscience.com',
        title='IDP Custom Supervision Block Flow Example (V33)',
        # Flow identifiers are globally unique
        manifest=IDPManifest(flow_identifier='IDP_CUSTOM_SUPERVISION_V33_EXAMPLE'),
        # It is important to include all blocks that are instantiated here in the blocks
        # field and make sure they follow the order of the flow. For example, if machine
        # classification depends on the output of case collation, then machine_collation must
        # come before machine_classification in this blocks array
        blocks=[
            submission_bootstrap,
            machine_collation,
            machine_classification,
            manual_classification,
            machine_identification,
            manual_identification,
            machine_transcription,
            manual_transcription,
            *_collate_classified_documents(manual_transcription),
            get_fields_for_custom_sv,
            format_sv_template,
            idp_custom_supervision,
            custom_supervision_collation,
            submission_complete,
            outputs,
        ],
        input=get_idp_wf_inputs(idp_wf_config),
        description='IDP Custom Supervision - All Fields (V33)',
        triggers=IDPTriggers(),
        output={'submission': submission_complete.output()},
    )


if __name__ == '__main__':
    export_flow(flow=entry_point_flow())


V32

Name

Description

Release

Sample Docs

Download

Hello Flow

A single Code block.

hello_flow.zip

IDP Starter V32.0.9+

Basic document processing flow. Package includes release and sample documents for processing Form W-9s.

../_images/checkmark.png ../_images/checkmark.png

idp_flow_v32.zip

IDP Starter V32.0.0 to V32.0.8

Basic document processing flow. Package includes release and sample documents for processing Form W-9s.

../_images/checkmark.png ../_images/checkmark.png

idp_flow_v32_0_8.zip

IDP w/ Code Block

IDP Starter + a script to modify the extracted data.

custom_idp_flow_v32.zip

Invoice Validation

Validates invoice line items and the total invoice amount.

../_images/checkmark.png ../_images/checkmark.png

invoice_validation_v32.zip

PDF Redaction

Redaction of fields in a PDF by performing REGEX on fields and blacking out the corresponding bounding box.

../_images/checkmark.png ../_images/checkmark.png

pdf_redaction_v32.zip

IDP Full Page Transcription

Transcribes text from documents.

idp_fpt_flow_v32.zip

IDP Custom Supervision V32

Sends all fields from the submission to custom supervision.

../_images/checkmark.png ../_images/checkmark.png

idp_custom_supervision_v32.zip

Hello Flow

This example consists of a single Code block. It is the simplest possible executable flow.

Download hello_flow.py

Hello Flow
from uuid import UUID

from flows_sdk import utils
from flows_sdk.blocks import CodeBlock
from flows_sdk.flows import Flow, Manifest, Parameter
from flows_sdk.package_utils import export_flow

# Flow identifiers are globally unique
# New versions in case of backward incompatibility are expected to have a different name
# (e.g. HELLO_FLOW_2).
# By convention, identifiers are snake-cased capital letter strings with an optional numeric suffix.
HELLO_FLOW_IDENTIFIER = 'HELLO_FLOW'

# Flows should have a deterministic UUID ensuring cross-system consistency
HELLO_FLOW_UUID = UUID('2e3ab564-fcf5-41fb-a573-4bc2fd153b6d')


def entry_point_flow() -> Flow:
    return sample_flow_with_secret()


# Flow inputs can be referenced in blocks, so usually it is a good idea define them somewhere
class FlowInputs:
    HELLO_INPUT = 'hello_input'


def sample_flow_with_secret() -> Flow:
    def code_fn(code_block_input_param: str) -> str:
        return f'Hello {code_block_input_param}'

    # Parameters can be added to a :func:`~Flow`
    hello_input_param = Parameter(
        name=FlowInputs.HELLO_INPUT, title='Hello input', type='string', optional=False
    )

    ccb = CodeBlock(
        reference_name='hello_ccb',
        code=code_fn,
        code_input={'code_block_input_param': utils.workflow_input(FlowInputs.HELLO_INPUT)},
    )

    return Flow(
        depedencies={},
        title='Hello World Flow',
        description='A simple Flow showcasing how inputs are passed',
        blocks=[ccb],
        owner_email='flows.sdk@hyperscience.com',
        manifest=Manifest(identifier=HELLO_FLOW_IDENTIFIER, input=[hello_input_param]),
        uuid=HELLO_FLOW_UUID,
        input={FlowInputs.HELLO_INPUT: 'World'},
    )


if __name__ == '__main__':
    export_flow(flow=entry_point_flow())


IDP Starter V32.0.9+

This example is a snapshot of our V32 Intelligent Document Processing (IDP) flow. It uses both Machine and Manual blocks to:

  1. Classify and collate pages

  2. Identify and transcribe text

  3. Additional manual transcription for any fields marked for review

  4. Output results to a downstream system

Download idp_flow_v32.zip

IDP Starter V32.0.9+ flow
from uuid import UUID

from flows_sdk.flows import Flow
from flows_sdk.implementations.idp_v32.idp_blocks import (
    CaseCollationBlock,
    FlexibleExtractionBlock,
    IDPOutputsBlock,
    MachineClassificationBlock,
    MachineIdentificationBlock,
    MachineTranscriptionBlock,
    ManualClassificationBlock,
    ManualIdentificationBlock,
    ManualTranscriptionBlock,
    SubmissionBootstrapBlock,
    SubmissionCompleteBlock,
)
from flows_sdk.implementations.idp_v32.idp_values import (
    IDPManifest,
    IDPTriggers,
    get_idp_wf_config,
    get_idp_wf_inputs,
)
from flows_sdk.package_utils import export_flow


def entry_point_flow() -> Flow:
    return idp_workflow()


def idp_workflow() -> Flow:
    idp_wf_config = get_idp_wf_config()

    # The idp flow basically processes, modifies and propagates the submission object from
    # block to block
    # Each block's processing result is usually included in the submission object

    # Submission bootstrap block initializes the submission object and prepares external images
    # or other submission data if needed
    submission_bootstrap = SubmissionBootstrapBlock(reference_name='submission_bootstrap')

    # Case collation block groups files, documents and pages (from the submission) into cases
    # In this example, case collation block receives the submission object and the cases
    # information from submission bootstrap block
    case_collation = CaseCollationBlock(
        reference_name='case_collation',
        submission=submission_bootstrap.output('submission'),
        cases=submission_bootstrap.output('api_params.cases'),
    )

    # Machine classification block automatically matches documents to structured, semi-structured
    # or additional layouts
    # In this example, machine classification block receives the submission object from
    # case collation block
    machine_classification = MachineClassificationBlock(
        reference_name='machine_classification',
        submission=case_collation.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        rotation_correction_enabled=idp_wf_config.rotation_correction_enabled,
    )

    # Manual classification block allows keyers to manually match submissions to their layouts.
    # Keyers may perform manual classification if machine classification cannot automatically
    # match a submission to a layout with high confidence
    # In this example, manual classification block receives the submission object from machine
    # classification block
    manual_classification = ManualClassificationBlock(
        reference_name='manual_classification',
        submission=machine_classification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
    )

    # Machine identification automatically identify fields and tables in the submission
    # In this example, machine identification block receives the submission object from manual
    # classification
    machine_identification = MachineIdentificationBlock(
        reference_name='machine_identification',
        submission=manual_classification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
    )

    # Manual identification allows keyers to complete field identification or table identification
    # tasks, where they draw bounding boxes around the contents of certain fields, table columns
    # or table rows. This identification process ensures that the system will be able to
    # transcribe the correct content in the upcoming transcription process
    # In this example, manual identification block receives the submission object from machine
    # identification
    manual_identification = ManualIdentificationBlock(
        reference_name='manual_identification',
        submission=machine_identification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        task_restrictions=idp_wf_config.manual_identification_config.task_restrictions,
    )

    # Machine transcription automatically transcribes the content of your submission
    # In this example, machine identification block receives the submission object from manual
    # identification
    machine_transcription = MachineTranscriptionBlock(
        reference_name='machine_transcription',
        submission=manual_identification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
    )

    # Manual transcription lets your keyers manually enter the text found in fields or tables
    # that could not be automatically transcribed
    # In this example, manual transcription block receives the submission object from machine
    # transcription block
    manual_transcription = ManualTranscriptionBlock(
        reference_name='manual_transcription',
        submission=machine_transcription.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        supervision_transcription_masking=(
            idp_wf_config.manual_transcription_config.supervision_transcription_masking
        ),
        table_output_manual_review=(
            idp_wf_config.manual_transcription_config.table_output_manual_review
        ),
        task_restrictions=idp_wf_config.manual_transcription_config.task_restrictions,
    )

    # Flexible extraction manually transcribes fields marked for review
    # In this example, flexible extraction block receives the submission object from manual
    # transcription block
    flexible_extraction = FlexibleExtractionBlock(
        reference_name='flexible_extraction',
        submission=manual_transcription.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        supervision_transcription_masking=(
            idp_wf_config.flexible_extraction_config.supervision_transcription_masking
        ),
        task_restrictions=idp_wf_config.flexible_extraction_config.task_restrictions,
    )

    # Submission complete block finalizes submission processing and updates reporting data
    # Every flow needs a complete block because it initiates Quality Assurance tasks and
    # changes the submission's status to "Complete"
    # In this example, submission complete block receives the submission object from
    # marked_as_complete custom code block
    submission_complete = SubmissionCompleteBlock(
        reference_name='submission_complete', submission=flexible_extraction.output('submission')
    )

    # Output block allows users to send data extracted by this idp flow to other systems
    # for downstream processing
    # In this example, this is an empty output block that does not do anything by default
    outputs = IDPOutputsBlock(inputs={'submission': submission_bootstrap.output('submission')})

    return Flow(
        # Flows should have a deterministic UUID ensuring cross-system consistency
        uuid=UUID('5d1515a9-ae37-45fc-bb03-d7dda943b60d'),
        owner_email='flows.sdk@hyperscience.com',
        title='IDP Starter Example (V32)',
        # Flow identifiers are globally unique
        manifest=IDPManifest(flow_identifier='IDP_V32_FLOW_EXAMPLE'),
        # It is important to include all blocks that are instantiated here in the blocks
        # field and make sure they follow the order of the flow. For example, if machine
        # classification depends on the output of case collation, then case_collation must
        # come before machine_classification in this blocks array
        blocks=[
            submission_bootstrap,
            case_collation,
            machine_classification,
            manual_classification,
            machine_identification,
            manual_identification,
            machine_transcription,
            manual_transcription,
            flexible_extraction,
            submission_complete,
            outputs,
        ],
        input=get_idp_wf_inputs(idp_wf_config),
        description='A basic approach to extracting information from documents.',
        triggers=IDPTriggers(),
        output={'submission': submission_complete.output()},
    )


if __name__ == '__main__':
    export_flow(flow=entry_point_flow())


IDP Starter V32.0.0 to V32.0.8

This flow uses an older version of the FlexibleExtractionBlock that was removed after platform version 32.0.8. This example is a snapshot of our V32 Intelligent Document Processing (IDP) flow. It uses both Machine and Manual blocks to:

  1. Classify and collate pages

  2. Identify and transcribe text

  3. Additional manual transcription for any fields marked for review

  4. Output results to a downstream system

Download idp_flow_v32_0_8.zip

IDP Starter V32.0.0 to V32.0.8 flow
from uuid import UUID

from flows_sdk.flows import Flow
from flows_sdk.implementations.idp_v32.idp_blocks import (
    CaseCollationBlock,
    IDPOutputsBlock,
    LegacyFlexibleExtractionBlock,
    MachineClassificationBlock,
    MachineIdentificationBlock,
    MachineTranscriptionBlock,
    ManualClassificationBlock,
    ManualIdentificationBlock,
    ManualTranscriptionBlock,
    SubmissionBootstrapBlock,
    SubmissionCompleteBlock,
)
from flows_sdk.implementations.idp_v32.idp_values import (
    IDPManifest,
    IDPTriggers,
    get_idp_wf_config,
    get_idp_wf_inputs,
)
from flows_sdk.package_utils import export_flow


def entry_point_flow() -> Flow:
    return idp_workflow()


def idp_workflow() -> Flow:
    idp_wf_config = get_idp_wf_config()

    # The idp flow basically processes, modifies and propagates the submission object from
    # block to block
    # Each block's processing result is usually included in the submission object

    # Submission bootstrap block initializes the submission object and prepares external images
    # or other submission data if needed
    submission_bootstrap = SubmissionBootstrapBlock(reference_name='submission_bootstrap')

    # Case collation block groups files, documents and pages (from the submission) into cases
    # In this example, case collation block receives the submission object and the cases
    # information from submission bootstrap block
    case_collation = CaseCollationBlock(
        reference_name='case_collation',
        submission=submission_bootstrap.output('submission'),
        cases=submission_bootstrap.output('api_params.cases'),
    )

    # Machine classification block automatically matches documents to structured, semi-structured
    # or additional layouts
    # In this example, machine classification block receives the submission object from
    # case collation block
    machine_classification = MachineClassificationBlock(
        reference_name='machine_classification',
        submission=case_collation.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        rotation_correction_enabled=idp_wf_config.rotation_correction_enabled,
    )

    # Manual classification block allows keyers to manually match submissions to their layouts.
    # Keyers may perform manual classification if machine classification cannot automatically
    # match a submission to a layout with high confidence
    # In this example, manual classification block receives the submission object from machine
    # classification block
    manual_classification = ManualClassificationBlock(
        reference_name='manual_classification',
        submission=machine_classification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
    )

    # Machine identification automatically identify fields and tables in the submission
    # In this example, machine identification block receives the submission object from manual
    # classification
    machine_identification = MachineIdentificationBlock(
        reference_name='machine_identification',
        submission=manual_classification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
    )

    # Manual identification allows keyers to complete field identification or table identification
    # tasks, where they draw bounding boxes around the contents of certain fields, table columns
    # or table rows. This identification process ensures that the system will be able to
    # transcribe the correct content in the upcoming transcription process
    # In this example, manual identification block receives the submission object from machine
    # identification
    manual_identification = ManualIdentificationBlock(
        reference_name='manual_identification',
        submission=machine_identification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        task_restrictions=idp_wf_config.manual_identification_config.task_restrictions,
    )

    # Machine transcription automatically transcribes the content of your submission
    # In this example, machine identification block receives the submission object from manual
    # identification
    machine_transcription = MachineTranscriptionBlock(
        reference_name='machine_transcription',
        submission=manual_identification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
    )

    # Manual transcription lets your keyers manually enter the text found in fields or tables
    # that could not be automatically transcribed
    # In this example, manual transcription block receives the submission object from machine
    # transcription block
    manual_transcription = ManualTranscriptionBlock(
        reference_name='manual_transcription',
        submission=machine_transcription.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        supervision_transcription_masking=(
            idp_wf_config.manual_transcription_config.supervision_transcription_masking
        ),
        table_output_manual_review=(
            idp_wf_config.manual_transcription_config.table_output_manual_review
        ),
        task_restrictions=idp_wf_config.manual_transcription_config.task_restrictions,
    )

    # Flexible extraction manually transcribes fields marked for review
    # In this example, flexible extraction block receives the submission object from manual
    # transcription block
    flexible_extraction = LegacyFlexibleExtractionBlock(
        reference_name='flexible_extraction',
        submission=manual_transcription.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        supervision_transcription_masking=(
            idp_wf_config.flexible_extraction_config.supervision_transcription_masking
        ),
        task_restrictions=idp_wf_config.flexible_extraction_config.task_restrictions,
    )

    # Submission complete block finalizes submission processing and updates reporting data
    # Every flow needs a complete block because it initiates Quality Assurance tasks and
    # changes the submission's status to "Complete"
    # In this example, submission complete block receives the submission object from
    # marked_as_complete custom code block
    submission_complete = SubmissionCompleteBlock(
        reference_name='submission_complete', submission=flexible_extraction.output('submission')
    )

    # Output block allows users to send data extracted by this idp flow to other systems
    # for downstream processing
    # In this example, this is an empty output block that does not do anything by default
    outputs = IDPOutputsBlock(inputs={'submission': submission_bootstrap.output('submission')})

    return Flow(
        # Flows should have a deterministic UUID ensuring cross-system consistency
        uuid=UUID('5d1515a9-ae37-45fc-bb03-d7dda943b60d'),
        owner_email='flows.sdk@hyperscience.com',
        title='IDP Starter Example (V32)',
        # Flow identifiers are globally unique
        manifest=IDPManifest(flow_identifier='IDP_V32_FLOW_EXAMPLE'),
        # It is important to include all blocks that are instantiated here in the blocks
        # field and make sure they follow the order of the flow. For example, if machine
        # classification depends on the output of case collation, then case_collation must
        # come before machine_classification in this blocks array
        blocks=[
            submission_bootstrap,
            case_collation,
            machine_classification,
            manual_classification,
            machine_identification,
            manual_identification,
            machine_transcription,
            manual_transcription,
            flexible_extraction,
            submission_complete,
            outputs,
        ],
        input=get_idp_wf_inputs(idp_wf_config),
        description='A basic approach to extracting information from documents.',
        triggers=IDPTriggers(),
        output={'submission': submission_complete.output()},
    )


if __name__ == '__main__':
    export_flow(flow=entry_point_flow())


IDP w/ Code Block

This example is identical to the IDP Starter V32.0.9+ example, except that it includes an additional Code Block to add “time completed” metadata to each document in the submission. This addition of metadata is an example of how Code Blocks can modify extracted data before outputting to downstream systems.

Download custom_idp_flow_v32.py

IDP w/ Code Block flow
from typing import Any
from uuid import UUID

from flows_sdk.blocks import CodeBlock
from flows_sdk.flows import Flow
from flows_sdk.implementations.idp_v32.idp_blocks import (
    CaseCollationBlock,
    FlexibleExtractionBlock,
    IDPOutputsBlock,
    MachineClassificationBlock,
    MachineIdentificationBlock,
    MachineTranscriptionBlock,
    ManualClassificationBlock,
    ManualIdentificationBlock,
    ManualTranscriptionBlock,
    SubmissionBootstrapBlock,
    SubmissionCompleteBlock,
)
from flows_sdk.implementations.idp_v32.idp_values import (
    IDPManifest,
    IDPTriggers,
    get_idp_wf_config,
    get_idp_wf_inputs,
)
from flows_sdk.package_utils import export_flow


def entry_point_flow() -> Flow:
    return idp_workflow()


def idp_workflow() -> Flow:
    idp_wf_config = get_idp_wf_config()

    # The idp flow basically processes, modifies and propagates the submission object from
    # block to block
    # Each block's processing result is usually included in the submission object

    # Submission bootstrap block initializes the submission object and prepares external images
    # or other submission data if needed
    submission_bootstrap = SubmissionBootstrapBlock(reference_name='submission_bootstrap')

    # Case collation block groups files, documents and pages (from the submission) into cases
    # In this example, case collation block receives the submission object and the cases
    # information from submission bootstrap block
    case_collation = CaseCollationBlock(
        reference_name='machine_collation',
        submission=submission_bootstrap.output('submission'),
        cases=submission_bootstrap.output('api_params.cases'),
    )

    # Machine classification block automatically matches documents to structured, semi-structured
    # or additional layouts
    # In this example, machine classification block receives the submission object from
    # case collation block
    machine_classification = MachineClassificationBlock(
        reference_name='machine_classification',
        submission=case_collation.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Manual classification block allows keyers to manually match submissions to their layouts.
    # Keyers may perform manual classification if machine classification cannot automatically
    # match a submission to a layout with high confidence
    # In this example, manual classification block receives the submission object from machine
    # classification block
    manual_classification = ManualClassificationBlock(
        reference_name='manual_classification',
        submission=machine_classification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Machine identification automatically identify fields and tables in the submission
    # In this example, machine identification block receives the submission object from manual
    # classification
    machine_identification = MachineIdentificationBlock(
        reference_name='machine_identification',
        submission=manual_classification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Manual identification allows keyers to complete field identification or table identification
    # tasks, where they draw bounding boxes around the contents of certain fields, table columns
    # or table rows. This identification process ensures that the system will be able to
    # transcribe the correct content in the upcoming transcription process
    # In this example, manual identification block receives the submission object from machine
    # identification
    manual_identification = ManualIdentificationBlock(
        reference_name='manual_identification',
        submission=machine_identification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Machine transcription automatically transcribes the content of your submission
    # In this example, machine identification block receives the submission object from manual
    # identification
    machine_transcription = MachineTranscriptionBlock(
        reference_name='machine_transcription',
        submission=manual_identification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Manual transcription lets your keyers manually enter the text found in fields or tables
    # that could not be automatically transcribed
    # In this example, manual transcription block receives the submission object from machine
    # transcription block
    manual_transcription = ManualTranscriptionBlock(
        reference_name='manual_transcription',
        submission=machine_transcription.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Flexible extraction manually transcribes fields marked for review
    # In this example, flexible extraction block receives the submission object from manual
    # transcription block
    flexible_extraction = FlexibleExtractionBlock(
        reference_name='flexible_extraction',
        submission=manual_transcription.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    def _mark_as_completed(submission: Any) -> Any:
        from datetime import datetime

        dt_completed = datetime.isoformat(datetime.utcnow())
        dt_completed_fmt = dt_completed + 'Z'

        for document in submission['documents']:
            document['state'] = 'complete'
            document['complete_time'] = dt_completed_fmt

            for page in document['pages']:
                page['state'] = 'complete'

        for page in submission['unassigned_pages']:
            page['state'] = 'complete'

        submission['state'] = 'complete'
        submission['complete_time'] = dt_completed_fmt

        return submission

    # Custom code block enables users to transform and validate extracted submission data
    # before Hyperscience sends it to downstream systems
    # In this example, user created a _mark_as_completed function to transform and validate
    # submission data
    # Notice that the _mark_as_completed function takes in a single argument which is passed
    # in using the code_input parameter
    custom_code = CodeBlock(
        reference_name='mark_as_completed',
        code=_mark_as_completed,
        code_input={'submission': flexible_extraction.output('submission')},
        title='Mark As Completed',
        description='Updated Transformed JSON to Completed State',
    )

    # Submission complete block finalizes submission processing and updates reporting data
    # Every flow needs a complete block because it initiates Quality Assurance tasks and
    # changes the submission's status to "Complete"
    # In this example, submission complete block receives the submission object from
    # marked_as_complete custom code block
    submission_complete = SubmissionCompleteBlock(
        reference_name='complete_submission',
        submission=custom_code.output(),  # this CCB returns the submission as a top-level object
        payload=custom_code.output(),
    )

    # Output block allows users to send data extracted by this idp flow to other systems
    # for downstream processing
    # In this example, no output block is instantiated (blocks=[])
    # Setting up output blocks via UI and leaving this empty is recommended
    outputs = IDPOutputsBlock(
        inputs={'submission': submission_bootstrap.output('submission')}, blocks=[]
    )

    # Trigger block allows users to send data to idp flow via sources other than the User Interface
    # In this example, no trigger block is instantiated (blocks=[])
    # Setting up trigger blocks via UI and leaving this empty is recommended
    triggers = IDPTriggers(blocks=[])

    return Flow(
        # Flows should have a deterministic UUID ensuring cross-system consistency
        uuid=UUID('f923871d-8742-45cd-ae6d-e0429c098421'),
        owner_email='flows.sdk@hyperscience.com',
        title='IDP with Custom Code Block Flow Example (V32)',
        # Flow identifiers are globally unique
        manifest=IDPManifest(flow_identifier='IDP_WITH_CUSTOM_CODE_V32_FLOW_EXAMPLE'),
        triggers=triggers,
        # It is important to include all blocks that are instantiated here in the blocks
        # field and make sure they follow the order of the flow. For example, if machine
        # classification depends on the output of case collation, then case_collation must
        # come before machine_classification in this blocks array
        blocks=[
            submission_bootstrap,
            case_collation,
            machine_classification,
            manual_classification,
            machine_identification,
            manual_identification,
            machine_transcription,
            manual_transcription,
            flexible_extraction,
            custom_code,
            submission_complete,
            outputs,
        ],
        input=get_idp_wf_inputs(idp_wf_config),
        description='Intelligent Document Processing with Custom Code Block Flow Example (V32)',
        output={'submission': submission_complete.output()},
    )


if __name__ == '__main__':
    export_flow(flow=entry_point_flow())


Invoice Validation

This example validates invoice line items and the total invoice amount and routes to a Flexible Extraction block for discrepancies.

Download invoice_validation_v32.zip

Invoice Validation flow
from typing import Any, Dict, List, Optional
from uuid import UUID

from flows_sdk.blocks import CodeBlock
from flows_sdk.flows import Flow, Parameter
from flows_sdk.implementations.idp_v32.idp_blocks import (
    CaseCollationBlock,
    FlexibleExtractionBlock,
    IDPOutputsBlock,
    MachineClassificationBlock,
    MachineIdentificationBlock,
    MachineTranscriptionBlock,
    ManualClassificationBlock,
    ManualIdentificationBlock,
    ManualTranscriptionBlock,
    SubmissionBootstrapBlock,
    SubmissionCompleteBlock,
)
from flows_sdk.implementations.idp_v32.idp_values import (
    IDPManifest,
    IDPTriggers,
    get_idp_wf_config,
    get_idp_wf_inputs,
)
from flows_sdk.package_utils import export_flow
from flows_sdk.utils import workflow_input


class CustomSettings:
    SkipFlexibleExtraction = 'Skip Flexible Extraction For Validation Discrepancies'
    IgnoreLineItemTranscription = 'Ignore Table Fields Not Transcribed'


LINE_ITEM_VALIDATION_PARAMETERS: List[Parameter] = [
    Parameter(
        name=CustomSettings.SkipFlexibleExtraction,
        type='boolean',
        title=CustomSettings.SkipFlexibleExtraction,
        ui={'hidden': False},
        description='Check this box to skip Flexible Extraction when line item validation and '
        'total amount discrepancies occur. Useful when downstream processes address '
        'any discrepancies',
        optional=True,
    ),
    Parameter(
        name=CustomSettings.IgnoreLineItemTranscription,
        type='boolean',
        title=CustomSettings.IgnoreLineItemTranscription,
        ui={'hidden': False},
        description='Check this box to prevent columns_not_transcribed messages in the output',
        optional=True,
    ),
]


def entry_point_flow() -> Flow:
    return idp_workflow()


def idp_workflow() -> Flow:
    idp_wf_config = get_idp_wf_config()

    # The idp flow basically processes, modifies and propagates the submission object from
    # block to block
    # Each block's processing result is usually included in the submission object

    # Submission bootstrap block initializes the submission object and prepares external images
    # or other submission data if needed
    submission_bootstrap = SubmissionBootstrapBlock(reference_name='submission_bootstrap')

    # Case collation block groups files, documents and pages (from the submission) into cases
    # In this example, case collation block receives the submission object and the cases
    # information from submission bootstrap block
    case_collation = CaseCollationBlock(
        reference_name='machine_collation',
        submission=submission_bootstrap.output('submission'),
        cases=submission_bootstrap.output('api_params.cases'),
    )

    # Machine classification block automatically matches documents to structured, semi-structured
    # or additional layouts
    # In this example, machine classification block receives the submission object from
    # case collation block
    machine_classification = MachineClassificationBlock(
        reference_name='machine_classification',
        submission=case_collation.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Manual classification block allows keyers to manually match submissions to their layouts.
    # Keyers may perform manual classification if machine classification cannot automatically
    # match a submission to a layout with high confidence
    # In this example, manual classification block receives the submission object from machine
    # classification block
    manual_classification = ManualClassificationBlock(
        reference_name='manual_classification',
        submission=machine_classification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Machine identification automatically identify fields and tables in the submission
    # In this example, machine identification block receives the submission object from manual
    # classification
    machine_identification = MachineIdentificationBlock(
        reference_name='machine_identification',
        submission=manual_classification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Manual identification allows keyers to complete field identification or table identification
    # tasks, where they draw bounding boxes around the contents of certain fields, table columns
    # or table rows. This identification process ensures that the system will be able to
    # transcribe the correct content in the upcoming transcription process
    # In this example, manual identification block receives the submission object from machine
    # identification
    manual_identification = ManualIdentificationBlock(
        reference_name='manual_identification',
        submission=machine_identification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Machine transcription automatically transcribes the content of your submission
    # In this example, machine identification block receives the submission object from manual
    # identification
    machine_transcription = MachineTranscriptionBlock(
        reference_name='machine_transcription',
        submission=manual_identification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Manual transcription lets your keyers manually enter the text found in fields or tables
    # that could not be automatically transcribed
    # In this example, manual transcription block receives the submission object from machine
    # transcription block
    manual_transcription = ManualTranscriptionBlock(
        reference_name='manual_transcription',
        submission=machine_transcription.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    def _perform_line_item_validations(
        submission: Any,
        api_params_ref: Any,
        wf_param_skip_fleex: Any,
        wf_param_ignore_transcribed: Any,
    ) -> Any:
        """
        Pseudo Code:
            Iterate submission documents:
                Initialize invoice total, if total_invoice_amount field transcribed
                Convert document table to row format from submission columnar format
                Iterate data rows:
                    Determine transcription results for each row
                    Determine calculations for each row from transcription results
                    Store transcription & calculation results in row_results object
                    For row columns both transcribed and calculated perform validations
                        Mark for Flexible Extraction row columns with differences
                        Mark for IDP Sync columns calculated but not transcribed (future)
                Mark for Flexible Extraction if line item calculations differ from invoice total
        Dictionary objects:
            row_results: Contains information about transcription/calculation for each row
            document_validation: Contains all row results for the document
            line_item_validation: Contains the results for all documents in the submission
        """

        import logging
        from typing import Tuple

        import numpy  # type: ignore # pylint: disable=import-error

        logger = logging.getLogger(__name__)
        logger.info('table_line_item_validation: _perform_line_item_validations(): entered')

        import locale

        locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')

        TRANSCRIPTION_SOURCE_CUSTOM = 'custom'

        def _my_get_float(value: str) -> float:
            if value != '':
                try:
                    currency_symbol: str = locale.localeconv()['currency_symbol']  # type: ignore
                    return locale.atof(value.strip(currency_symbol))
                except:
                    raise ValueError(f'Could not convert value="{value}" to float')
            else:
                return 0.0

        def _format_column(value: float, column_name: str = '') -> str:
            try:
                lcol = column_name.lower()
                if 'quantity' in lcol or 'unit_price' in lcol:
                    return '{:.4f}'.format(value)
                elif 'total_price' in lcol or 'amount' in lcol:
                    return _my_get_currency(value)
                else:
                    return '{:.2f}'.format(value)
            except Exception as e:
                logger.warning(
                    '_format_column(): error occurred for float value={}, '
                    'column_name={}, error={}',
                    value,
                    column_name,
                    str(e),
                )
                return '0.0'

        def _my_get_currency(value: float) -> str:
            return locale.currency(value, grouping=True)

        def _can_calculate(line_item_data: Dict[str, Any], calc_columns: set) -> bool:
            """
            Shortcut function for determining whether a line item has enough information
            to perform a calculation
            """
            if not calc_columns.issubset(line_item_data.keys()):
                return False
            else:
                for calc_col in calc_columns:
                    if line_item_data[calc_col]['has_value'] is False:
                        return False
            return True

        def _table_columns_to_row_data(document: Any) -> Dict[Tuple[int, int, int], Any]:
            """Converts document table cells to a different format"""
            row_data: Dict[Tuple[int, int, int], Any] = {}

            for column in document.get('table', {}).get('columns', []):
                if column.get('output_name', '') not in CALC_LINE_ITEM_ALL_COLS:
                    continue

                for cell in column.get('cells', []):
                    row_key = (document['id'], cell['page_number'], cell['row_number'])
                    row_data.setdefault(row_key, {}).update(
                        {column['output_name']: cell.get('transcription_normalized', '')}
                    )

            return row_data

        def _get_row_results_transcribed(row_column: Dict[Any, Any]) -> Dict[str, Dict[Any, Any]]:
            """
            Returns a dict for line item columns (Quantity, Unit Price, Total Price) showing
            results of the transcription:
                * was the field transcribed
                * does the transcribed field contain a value (i.e. not empty string)
                * the normalized value
                * a float representation of the normalized value
            """
            ret: Dict[str, Dict[Any, Any]] = {}
            for col_name in CALC_LINE_ITEM_ALL_COLS:
                if col_name not in row_column:
                    continue

                transcribed = False
                has_value = False

                if row_column.get(col_name):
                    transcribed = True
                    has_value = True
                    normalized: str = row_column.get(col_name, '')
                    ret.update({col_name: {'transcription_normalized': normalized}})
                    try:
                        normalized_float = _my_get_float(normalized)
                    except ValueError as e:
                        # has_value = False
                        normalized_float = 0.0
                        ret.get(col_name, {}).setdefault('exceptions', []).append(
                            f'Transcription error for {col_name}: {str(e)}'
                        )

                    ret.get(col_name, {}).update({'float_value': normalized_float})

                ret.get(col_name, {}).update({'transcribed': transcribed, 'has_value': has_value})

            return ret

        def _get_row_results_calculated(line_item_data: Dict[str, Any]) -> Dict[str, float]:
            """
            Returns a dict for calculations on a line item. Only columns that can be calculated
            are included in the return dict. This serves as an indicator that we can validate
            a line item. IOW, if a column is not present in the row results calculated then we
            can't validate the line item
            """

            if _can_calculate(line_item_data, {'quantity'}):
                try:
                    q = _my_get_float(line_item_data['quantity']['transcription_normalized'])
                except Exception as e:
                    row_results.setdefault('exceptions_calculated', []).append(
                        f'Transcription error for quantity: {str(e)}'
                    )
                    q = 0.0

            if _can_calculate(line_item_data, {'unit_price'}):
                try:
                    u = _my_get_float(line_item_data['unit_price']['transcription_normalized'])
                except Exception as e:
                    row_results.setdefault('exceptions_calculated', []).append(
                        f'Transcription error for unit_price: {str(e)}'
                    )
                    u = 0.0

            if _can_calculate(line_item_data, {'total_price'}):
                try:
                    t = _my_get_float(line_item_data['total_price']['transcription_normalized'])
                except Exception as e:
                    row_results.setdefault('exceptions_calculated', []).append(
                        f'Transcription error for total_price: {str(e)}'
                    )
                    t = 0.0

            ret = {}
            if _can_calculate(line_item_data, CALC_TOTAL_PRICE_COLS):
                ret.update({'total_price': q * u})

            if _can_calculate(line_item_data, CALC_QUANTITY_COLS):
                if u != 0.0:
                    ret.update({'quantity': t / u})

            if _can_calculate(line_item_data, CALC_UNIT_PRICE_COLS):
                if q != 0.0:
                    ret.update({'unit_price': t / q})

            return ret

        def _require_idp_sync(
            column_name: str, row_key: Tuple[int, int, int], transcription_normalized: str
        ) -> None:
            """
            TODO: When/if we merge line item columns that were not transcribed but could be
                calculated we would then perform IDP sync. Note when performing IDP sync for
                a table that all rows and columns must be supplied to the IDP sync process.
            """
            line_item_validation.update({'idp_sync': True})
            document_validation.update({'idp_sync': True})

            sync_cell = {
                'column_name': column_name,
                'document_id': row_key[0],
                'page_number': row_key[1],
                'row_number': row_key[2],
                'transcription_normalized': transcription_normalized,
            }

            document_validation.setdefault('sync_columns', []).append(sync_cell)

        def _require_flexible_extraction_for_cell(
            column_name: str, row_key: Tuple[int, int, int], message: str
        ) -> None:
            """
            Marks a table cell for Flexible Extraction. An exception string is appended to
             the submission, document, and field/table cell.
            """

            line_item_validation.update({'is_valid': False})
            document_validation.update({'is_valid': False})

            if wf_param_skip_fleex is False:
                line_item_validation.update({'flexible_extraction': True})
                document_validation.update({'flexible_extraction': True})

            if column_name and row_key:
                doc_table_col = {}
                doc_table_cell = {}
                for column in document.get('table', {}).get('columns', []):
                    if column['output_name'] == column_name:
                        doc_table_col = column
                        for cell in column.get('cells', []):
                            if (cell['page_number'], cell['row_number']) == (
                                row_key[1],
                                row_key[2],
                            ):
                                doc_table_cell = cell
                                break
                        break

                if doc_table_col and doc_table_cell:
                    document_validation.setdefault('columns', []).append(
                        {
                            'field_name': doc_table_col['field_name'],
                            'output_name': column_name,
                            'page_number': row_key[1],
                            'row_number': row_key[2],
                            'flexible_extraction': True,
                            'exceptions': [message],
                        }
                    )

                    if wf_param_skip_fleex is False:
                        doc_table_cell['process_flexible_extraction_type'] = 'FORCE'
                        doc_table_cell['needs_flexible_extraction_review'] = True

                    doc_table_cell['transcription_source'] = TRANSCRIPTION_SOURCE_CUSTOM
                    if message:
                        doc_table_cell.setdefault('exceptions', []).append(message)
                        document.setdefault('exceptions', []).append(message)
                        submission.setdefault('exceptions', []).append(
                            f'page_{row_key[1]}_row_{row_key[2]}: {message}'
                        )
            return

        def _require_flexible_extraction_reason(reason: str) -> None:
            """
            Marks the document for Flexible Extraction. The reason is added as an exception
            """

            line_item_validation.update({'is_valid': False})
            document_validation.update({'is_valid': False})

            line_item_validation.update({'flexible_extraction': True})
            document_validation.update({'flexible_extraction': True})

            if reason:
                if reason not in document_validation.get('reasons', []):
                    document_validation.setdefault('reasons', []).append(reason)
                    document.setdefault('exceptions', []).append(reason)
                    submission.setdefault('exceptions', []).append(reason)

            return

        def _require_flexible_extraction_for_field(
            field_id: int, reason: str = '', message: str = ''
        ) -> None:
            """
            Marks the document_field for Flexible Extraction. An exception string is appended
             to the submission, document, and field/table cell.
            """
            line_item_validation.update({'is_valid': False})
            document_validation.update({'is_valid': False})

            line_item_validation.update({'flexible_extraction': True})
            document_validation.update({'flexible_extraction': True})

            if reason:
                if reason not in document_validation.get('reasons', []):
                    document_validation.setdefault('reasons', []).append(reason)
                    document.setdefault('exceptions', []).append(reason)
                    submission.setdefault('exceptions', []).append(
                        f'document id={document["id"]}; {reason}'
                    )

            if message:
                if message not in document_validation.get('exceptions', []):
                    document_validation.setdefault('exceptions', []).append(message)
                    document.setdefault('exceptions', []).append(message)
                    submission.setdefault('exceptions', []).append(
                        f'document id={document["id"]}; {message}'
                    )
                    line_item_validation.setdefault('exceptions', []).append(message)

            if field_id:
                if 'fields' in document_validation:
                    validation_field: Dict[Any, Any] = [
                        field
                        for field in document_validation.get('fields', [])
                        if field['field_id'] == field_id
                    ][0]
                    if validation_field:
                        if reason and reason not in validation_field.get('reasons', []):
                            validation_field.setdefault('reasons', []).append(reason)
                        if message and message not in validation_field.get('exceptions', []):
                            validation_field.setdefault('exceptions', []).append(message)
                        else:
                            validation_field.setdefault('exceptions', []).append(reason)
                        return

                doc_field = [
                    field for field in document.get('document_fields') if field['id'] == field_id
                ][0]

                if doc_field:
                    document_validation.setdefault('fields', []).append(
                        {
                            'field_id': field_id,
                            'name': doc_field['field_name'],
                            'output_name': doc_field['output_name'],
                            'flexible_extraction': True,
                            'reasons': [reason],
                            'exceptions': [message],
                        }
                    )

                    if wf_param_skip_fleex is False:
                        doc_field['process_flexible_extraction_type'] = 'FORCE'
                        doc_field['needs_flexible_extraction_review'] = True

                    doc_field['transcription_source'] = TRANSCRIPTION_SOURCE_CUSTOM

            return

        CALC_LINE_ITEM_ALL_COLS = {'quantity', 'unit_price', 'total_price'}

        CALC_TOTAL_PRICE_COLS = {'quantity', 'unit_price'}
        CALC_QUANTITY_COLS = {'unit_price', 'total_price'}
        CALC_UNIT_PRICE_COLS = {'quantity', 'total_price'}

        line_item_validation: Dict[Any, Any] = {}

        for document in submission.get('documents', []):
            document_validation: Dict[Any, Any] = {'is_valid': True, 'flexible_extraction': False}

            invoice: Dict[Any, Any] = {'running_total': 0.0}

            # Initialize invoice total, if total_invoice_amount field transcribed
            for field in document.get('document_fields', []):
                if field.get('output_name') == 'total_invoice_amount':
                    logger.info('table_line_item_validation: found total_invoice_amount')
                    if field.get('transcription_normalized'):
                        try:
                            total_invoice_amount = _my_get_float(field['transcription_normalized'])
                        except ValueError as e:
                            total_invoice_amount = 0.0
                            msg = f'Transcription error for total_invoice_amount: {str(e)}'
                            _require_flexible_extraction_for_field(
                                field['id'], 'total_invoice_amount_transcription_error', msg
                            )

                        invoice.update(
                            {'total_invoice_amount': total_invoice_amount or 0.0, 'field': field}
                        )

            if 'table' not in document:
                continue

            if not line_item_validation:
                # initialize line_item_validation first time we encounter a document
                # assume document is valid and fleex is not required
                line_item_validation = {'is_valid': True, 'flexible_extraction': False}

            line_item_columns = [
                column
                for column in document.get('table', {}).get('columns', [])
                if column['output_name'] in CALC_LINE_ITEM_ALL_COLS
            ]

            if not line_item_columns:
                logger.info('No line item validation columns found in table')
                document.setdefault('exceptions', []).append(
                    f'No line item validation columns found in table: {CALC_LINE_ITEM_ALL_COLS}'
                )
                document_validation.setdefault('exceptions', []).append(
                    f'No line item validation columns found in table: {CALC_LINE_ITEM_ALL_COLS}'
                )
                line_item_validation.setdefault('exceptions', []).append(
                    f'No line item validation columns found in table: document id={document["id"]}'
                )
                continue

            # Convert document table to row format from submission columnar format
            #
            #   row_key is Tuple(document['id'], cell['page_number'], cell['row_number'])
            #       {
            #           output_name: transcription_normalized
            #       }
            #
            row_data = _table_columns_to_row_data(document)

            if not row_data:
                document.setdefault('exceptions', []).append(
                    'No table row data available for validations'
                )
                document_validation.setdefault('exceptions', []).append(
                    'No table row data available for validations'
                )
                line_item_validation.setdefault('exceptions', []).append(
                    'No table row data available for validations for document id '
                    f'{document["id"]}'
                )

            for row_key, row_data_info in row_data.items():
                row_results: Dict[Any, Any] = {
                    'all_transcribed': False,
                    'columns_transcribed': set(),
                    'columns_not_transcribed': set(),
                    'exceptions_transcribed': [],
                    'all_calculated': False,
                    'columns_calculated': set(),
                    'columns_not_calculated': set(),
                    'exceptions_calculated': [],
                }

                # Determine transcription results for each row
                line_item_transcribed = _get_row_results_transcribed(row_data_info)

                # a set containing only columns that were transcribed
                columns_transcribed = {
                    col_name
                    for col_name, col_info in line_item_transcribed.items()
                    if col_info['transcribed']
                }
                columns_not_transcribed = CALC_LINE_ITEM_ALL_COLS.difference(columns_transcribed)

                transcription_exceptions = [
                    f'id_{row_key[0]}_page_{row_key[1]}_row_{row_key[2]}: {s}'
                    for v in line_item_transcribed.values()
                    for s in v.get('exceptions', [])
                ]

                # the CustomSetting doesn't apply to errors for fields
                # that have been transcribed
                #
                # the CustomSetting prevents errors when a line item field was not found
                if transcription_exceptions:
                    document_validation.setdefault('exceptions', []).extend(
                        transcription_exceptions
                    )
                    line_item_validation.setdefault('exceptions', []).extend(
                        transcription_exceptions
                    )

                # add the line item amount to the running total
                if 'total_price' in line_item_transcribed:
                    total_price_exceptions = line_item_transcribed.get('total_price', {}).get(
                        'exceptions', []
                    )
                    if len(total_price_exceptions) > 0:
                        _require_flexible_extraction_for_cell(
                            'total_price', row_key, ','.join(total_price_exceptions)
                        )

                    if line_item_transcribed.get('total_price', {}).get('has_value', False):
                        invoice.update(
                            {
                                'running_total': invoice['running_total']
                                + line_item_transcribed['total_price']['float_value']
                            }
                        )

                # Determine calculations for each row from transcription results
                # Calculate line item columns based on transcription results
                #
                # a column is only present in line_item_calculated if it can be calculated
                #
                line_item_calculated = _get_row_results_calculated(line_item_transcribed)

                columns_calculated = set(line_item_calculated.keys())
                columns_not_calculated = CALC_LINE_ITEM_ALL_COLS.difference(columns_calculated)

                # Store transcription & calculation results in row_results object
                #
                # sets are converted to lists since sets cannot be serialized which results in a
                # TypeError during flow execution
                #
                row_results.update(
                    {
                        'all_transcribed': columns_transcribed == CALC_LINE_ITEM_ALL_COLS,
                        'columns_transcribed': list(columns_transcribed),
                        'columns_not_transcribed': list(columns_not_transcribed),
                        'exceptions_transcribed': transcription_exceptions,
                        'all_calculated': line_item_calculated.keys() == CALC_LINE_ITEM_ALL_COLS,
                        'columns_calculated': list(columns_calculated),
                        'columns_not_calculated': list(columns_not_calculated),
                    }
                )

                # For row columns both transcribed and calculated perform validations
                for col_name in columns_transcribed.intersection(columns_calculated):
                    transcribed_value = line_item_transcribed[col_name]['float_value']
                    calculated_value = line_item_calculated[col_name]
                    difference = transcribed_value - calculated_value
                    row_results.update(
                        {
                            col_name: {
                                'transcribed_value': transcribed_value,
                                'calculated_value': _format_column(calculated_value, col_name),
                                'difference': _format_column(difference, col_name),
                            }
                        }
                    )

                    if not numpy.isclose(calculated_value, transcribed_value):
                        message = (
                            f'{col_name}: value_difference; transcribed='
                            f'{_format_column(transcribed_value, col_name)}, '
                            f'calculated={_format_column(calculated_value, col_name)}, '
                            f'difference={_format_column(difference, col_name)}'
                        )

                        _require_flexible_extraction_for_cell(col_name, row_key, message)

                if wf_param_ignore_transcribed is False:
                    #
                    # Mark for IDP Sync columns calculated but not transcribed
                    #
                    # For example, unit price & total price were transcribed but quantity
                    # was not. The quantity can be calculated and the added to the row
                    #
                    # TODO: if we've calculated columns that were not transcribed we can
                    #   add them through IDP sync. If the invoice total validation fails
                    #   then also send the added row cell to flexible extraction
                    for col_name in columns_calculated.difference(columns_transcribed):
                        # this column was calculated but not transcribed - add it via IDP sync later
                        _require_idp_sync(
                            col_name,
                            row_key,
                            _format_column(line_item_calculated[col_name], col_name),
                        )
                        row_results.get('idp_sync_cols', []).append(col_name)

                    if not row_results['all_transcribed']:
                        _require_flexible_extraction_reason('columns_not_transcribed')

            # Mark for Flexible Extraction if line item calculations differ from invoice total
            if 'total_invoice_amount' in invoice:
                document_validation.update(
                    {
                        'total_amount': _format_column(invoice['total_invoice_amount']),
                        'line_item_total_amount': _format_column(invoice['running_total']),
                    }
                )

                if numpy.isclose(invoice['running_total'], 0.0):

                    _require_flexible_extraction_reason('could_not_calculate_total_invoice_amount')

                elif not numpy.isclose(invoice['total_invoice_amount'], invoice['running_total']):

                    document_validation.update(
                        {
                            'total_difference': _format_column(
                                invoice['total_invoice_amount'] - invoice['running_total']
                            )
                        }
                    )

                    message = (
                        'Line item total does not equal invoice total: '
                        f'{invoice["field"]["field_name"]}={invoice["total_invoice_amount"]} '
                        '(Transcribed); '
                        f'Line Item Total={_format_column(invoice["running_total"])} '
                        '(Calculated); '
                        f'Difference={document_validation["total_difference"]}; '
                    )

                    _require_flexible_extraction_for_field(
                        field_id=invoice['field']['id'],
                        message=message,
                        reason='total_invoice_amount_difference',
                    )
            else:
                document_validation.update({'is_valid': False})
                document_validation.setdefault('exceptions', []).append(
                    'Could not locate field output_name=total_invoice_amount'
                )
                line_item_validation.update({'is_valid': False})
                line_item_validation.setdefault('exceptions', []).append(
                    'Could not locate field output_name=total_invoice_amount '
                    f'for document id: {document["id"]}'
                )

            # set to fleex when it's required and the skip param is false
            if (
                document_validation.get('flexible_extraction', False)
                and wf_param_skip_fleex is False
            ):
                document['needs_flexible_extraction_review'] = True

            line_item_validation.update({f'doc_{document["id"]}': document_validation})
            if document_validation.get('is_valid', True) is False:
                line_item_validation.update({'is_valid': False})

        if 'is_valid' not in line_item_validation:
            # no documents were processed when is_valid is not in line_item_validation
            line_item_validation.update({'is_valid': False})
            line_item_validation.setdefault('exceptions', []).append(
                'No validations occurred due ' 'to incomplete submission data'
            )

        submission.update({'line_item_validation': line_item_validation})

        logger.info('table_line_item_validation: _perform_line_item_validations(): returned')

        return {'submission': submission, 'line_item_validation': line_item_validation}

    line_item_validation = CodeBlock(
        reference_name='perform_validate_line_items',
        code=_perform_line_item_validations,
        code_input={
            'submission': manual_transcription.output('submission'),
            'api_params_ref': submission_bootstrap.output('api_params'),
            'wf_param_skip_fleex': workflow_input(CustomSettings.SkipFlexibleExtraction),
            'wf_param_ignore_transcribed': workflow_input(
                CustomSettings.IgnoreLineItemTranscription
            ),
        },
        title='Validate Line Items',
        description='Validates the Invoice' 's Line Items',
    )

    # Flexible extraction manually transcribes fields marked for review
    # In this example, flexible extraction block receives the submission object from manual
    # transcription block
    flexible_extraction = FlexibleExtractionBlock(
        reference_name='flexible_extraction',
        submission=line_item_validation.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    def _load_submission(submission: Any) -> Any:
        import inspect

        submission_id_ref = submission['id']
        proxy = inspect.stack()[1].frame.f_locals['proxy']
        r = proxy.sdm_get(f'api/v5/submissions/{submission_id_ref}?flat=False')
        return r.json()

    load_submission = CodeBlock(
        reference_name='load_submission',
        code=_load_submission,
        code_input={'submission': flexible_extraction.output('submission')},
        title='Load Submission',
        description='Returns Submission in API v5 Format',
    )

    def _mark_as_complete(submission: Any, line_item_validation: Any) -> Any:
        if line_item_validation:
            submission.update({'line_item_validation': line_item_validation})

        from datetime import datetime

        dt_completed = datetime.isoformat(datetime.utcnow())
        dt_completed_fmt = dt_completed + 'Z'

        for document in submission.get('documents', []):
            document['state'] = 'complete'
            document['complete_time'] = dt_completed_fmt

            for page in document.get('pages', []):
                page['state'] = 'complete'

            for field in document.get('document_fields', []):
                field['state'] = 'complete'

            for cell in [
                cell
                for document_table in document.get('document_tables', [])
                for row in document_table.get('rows', [])
                for cell in row.get('cells', [])
            ]:
                cell['state'] = 'complete'

        for page in submission.get('unassigned_pages', []):
            page['state'] = 'complete'

        if 'state' in submission:
            submission['state'] = 'complete'
        if 'complete_time' in submission:
            submission['complete_time'] = dt_completed_fmt

        return submission

    # Custom code block enables users to transform and validate extracted submission data
    # before Hyperscience sends it to downstream systems
    # In this example, user created a _mark_as_completed function to transform and validate
    # submission data
    # Notice that the _mark_as_completed function takes in a single argument which is passed
    # in using the code_input parameter
    mark_as_complete = CodeBlock(
        reference_name='mark_as_completed',
        code=_mark_as_complete,
        code_input={
            'submission': load_submission.output(),
            'line_item_validation': line_item_validation.output('line_item_validation'),
        },
        title='Mark As Completed',
        description='Updated Transformed JSON to Completed State',
    )

    # Submission complete block finalizes submission processing and updates reporting data
    # Every flow needs a complete block because it initiates Quality Assurance tasks and
    # changes the submission's status to "Complete"
    # In this example, submission complete block receives the submission object from
    # marked_as_complete custom code block
    submission_complete = SubmissionCompleteBlock(
        reference_name='complete_submission',
        payload=mark_as_complete.output(),
        submission=mark_as_complete.output(),
    )

    # Output block allows users to send data extracted by this idp flow to other systems
    # for downstream processing
    # In this example, this is an empty output block that does not do anything by default
    outputs = IDPOutputsBlock(inputs={'submission': submission_bootstrap.output('submission')})

    invoice_flow = Flow(
        # Flows should have a deterministic UUID ensuring cross-system consistency
        uuid=UUID('749bbcfa-020f-428f-8878-b1206e2a4d79'),
        owner_email='flows.sdk@hyperscience.com',
        title='Line Item Validations',
        # Flow identifiers are globally unique
        manifest=IDPManifest(flow_identifier='LINE_ITEM_VALIDATION'),
        # It is important to include all blocks that are instantiated here in the blocks
        # field and make sure they follow the order of the flow. For example, if machine
        # classification depends on the output of case collation, then case_collation must
        # come before machine_classification in this blocks array
        blocks=[
            submission_bootstrap,
            case_collation,
            machine_classification,
            manual_classification,
            machine_identification,
            manual_identification,
            machine_transcription,
            manual_transcription,
            line_item_validation,
            flexible_extraction,
            load_submission,
            mark_as_complete,
            submission_complete,
            outputs,
        ],
        input=get_idp_wf_inputs(idp_wf_config),
        description='Perform validation for total invoice amount and for line item quantity, '
        'unit price, and total price',
        triggers=IDPTriggers(),
        output={'submission': submission_complete.output()},
    )

    inputs: List[Parameter] = invoice_flow.manifest.input or []
    for i in LINE_ITEM_VALIDATION_PARAMETERS:
        inputs.append(i)

    wf_input: Optional[Dict[str, Any]] = invoice_flow.input
    # avoid mypy error 'has no attribute update'
    assert wf_input is not None

    wf_input.update(
        {
            CustomSettings.SkipFlexibleExtraction: False,
            CustomSettings.IgnoreLineItemTranscription: False,
        }
    )

    return invoice_flow


if __name__ == '__main__':
    export_flow(flow=entry_point_flow())


PDF Redaction

This example allows redaction of fields in a PDF by performing REGEX on fields/segments and blacking out corresponding bounding boxes.

Download pdf_redaction_v32.zip

PDF Redaction flow
from typing import Any
from uuid import UUID

from flows_sdk.blocks import CodeBlock
from flows_sdk.flows import Flow
from flows_sdk.implementations.idp_v32.idp_blocks import (
    CaseCollationBlock,
    FlexibleExtractionBlock,
    IDPOutputsBlock,
    MachineClassificationBlock,
    MachineIdentificationBlock,
    MachineTranscriptionBlock,
    ManualClassificationBlock,
    ManualIdentificationBlock,
    ManualTranscriptionBlock,
    SubmissionBootstrapBlock,
    SubmissionCompleteBlock,
)
from flows_sdk.implementations.idp_v32.idp_values import (
    IDPManifest,
    IDPTriggers,
    get_idp_wf_config,
    get_idp_wf_inputs,
)
from flows_sdk.package_utils import export_flow


def entry_point_flow() -> Flow:
    return idp_workflow()


def idp_workflow() -> Flow:
    idp_wf_config = get_idp_wf_config()

    # The idp flow basically processes, modifies and propagates the submission object from
    # block to block
    # Each block's processing result is usually included in the submission object

    # Submission bootstrap block initializes the submission object and prepares external images
    # or other submission data if needed
    submission_bootstrap = SubmissionBootstrapBlock()

    # Case collation block groups files, documents and pages (from the submission) into cases
    # In this example, case collation block receives the submission object and the cases
    # information from submission bootstrap block
    case_collation = CaseCollationBlock(
        submission=submission_bootstrap.output('submission'),
        cases=submission_bootstrap.output('api_params.cases'),
    )

    # Machine classification block automatically matches documents to structured, semi-structured
    # or additional layouts
    # In this example, machine classification block receives the submission object from
    # case collation block
    machine_classification = MachineClassificationBlock(
        submission=case_collation.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Manual classification block allows keyers to manually match submissions to their layouts.
    # Keyers may perform manual classification if machine classification cannot automatically
    # match a submission to a layout with high confidence
    # In this example, manual classification block receives the submission object from machine
    # classification block
    manual_classification = ManualClassificationBlock(
        submission=machine_classification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Machine identification automatically identify fields and tables in the submission
    # In this example, machine identification block receives the submission object from manual
    # classification
    machine_identification = MachineIdentificationBlock(
        submission=manual_classification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Manual identification allows keyers to complete field identification or table identification
    # tasks, where they draw bounding boxes around the contents of certain fields, table columns
    # or table rows. This identification process ensures that the system will be able to
    # transcribe the correct content in the upcoming transcription process
    # In this example, manual identification block receives the submission object from machine
    # identification
    manual_identification = ManualIdentificationBlock(
        submission=machine_identification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Machine transcription automatically transcribes the content of your submission
    # In this example, machine identification block receives the submission object from manual
    # identification
    machine_transcription = MachineTranscriptionBlock(
        submission=manual_identification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Manual transcription lets your keyers manually enter the text found in fields or tables
    # that could not be automatically transcribed
    # In this example, manual transcription block receives the submission object from machine
    # transcription block
    manual_transcription = ManualTranscriptionBlock(
        submission=machine_transcription.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Flexible extraction manually transcribes fields marked for review
    # In this example, flexible extraction block receives the submission object from manual
    # transcription block
    flexible_extraction = FlexibleExtractionBlock(
        submission=manual_transcription.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    def _load_submission(submission: Any) -> Any:
        import inspect

        submission_id_ref = submission['id']
        proxy = inspect.stack()[1].frame.f_locals['proxy']
        r = proxy.sdm_get(f'api/v5/submissions/{submission_id_ref}?flat=False')
        return r.json()

    load_submission = CodeBlock(
        code=_load_submission,
        code_input={'submission': flexible_extraction.output('submission')},
        title='Load Submission',
        description='Returns Submission in API v5 Format',
    )

    def _redact_pdf(submission: Any) -> Any:
        import inspect
        import os
        import subprocess
        import tempfile

        import cv2  # type: ignore # pylint: disable=import-error

        # pylint: disable=import-error
        from blocks.base_python_block import BlobCreateParams  # type: ignore
        from sdm_image.image_utils.image_read import blob_to_cv2_image  # type: ignore

        # pylint: enable=import-error

        proxy = inspect.stack()[1].frame.f_locals['proxy']
        color = (0, 0, 0)
        thickness = -1

        try:
            for document in submission['documents']:
                images_list = []
                for page in document['pages']:
                    if page['corrected_image_url']:
                        image_url_parts = page['corrected_image_url'].split('/')
                        image_blob = proxy.fetch_blob(image_url_parts[-1]).content
                        cv2_image = blob_to_cv2_image(image_blob)
                        h, w, _ = cv2_image.shape

                        rect_points = []

                        # Process Fields
                        for field in document['document_fields']:
                            start_x = None
                            start_y = None
                            end_x = None
                            end_y = None

                            if field['page_id'] == page['id'] and (
                                field['output_name'] and 'REDACT_' in field['output_name'].upper()
                            ):
                                url = field['field_image_url']
                                if url:
                                    params = url.split('?')
                                    params.pop(0)
                                    coords = params[0].split('&')
                                    for i, coord in enumerate(coords):
                                        pair = coord.split('=')
                                        if i == 0:
                                            start_x = float(pair[1])
                                        elif i == 1:
                                            start_y = float(pair[1])
                                        elif i == 2:
                                            end_x = float(pair[1])
                                        elif i == 3:
                                            end_y = float(pair[1])

                                    if start_x and start_y and end_x and end_y:
                                        start_point = (
                                            int(w * start_x),
                                            int(h * start_y),
                                        )
                                        end_point = (
                                            int(w * end_x),
                                            int(h * end_y),
                                        )

                                        rect_points.append((start_point, end_point))

                        for table in document['document_tables']:
                            for row in table['rows']:
                                for cell in row['cells']:
                                    start_x = None
                                    start_y = None
                                    end_x = None
                                    end_y = None

                                    if cell['page_id'] == page['id'] and (
                                        cell['column_name']
                                        and 'REDACT' in cell['column_name'].upper()
                                    ):
                                        start_x = cell['bounding_box'][0]
                                        start_y = cell['bounding_box'][1]
                                        end_x = cell['bounding_box'][2]
                                        end_y = cell['bounding_box'][3]

                                        if start_x and start_y and end_x and end_y:
                                            start_point = (
                                                int(w * start_x),
                                                int(h * start_y),
                                            )
                                            end_point = (
                                                int(w * end_x),
                                                int(h * end_y),
                                            )

                                            rect_points.append((start_point, end_point))

                        for point in rect_points:
                            cv2_image = cv2.rectangle(
                                cv2_image, point[0], point[1], color, thickness
                            )

                        tmp_file = tempfile.NamedTemporaryFile()
                        tmp_file.write(cv2.imencode('.tiff', cv2_image)[1].tobytes())
                        tmp_file.flush()
                        os.fsync(tmp_file.fileno())
                        images_list.append(tmp_file)

                if images_list:
                    images_list.append(tempfile.NamedTemporaryFile())
                    command = ['tiffcp', *[image_file.name for image_file in images_list]]

                    run_result_join = subprocess.run(
                        command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
                    )
                    if run_result_join.stderr:
                        # submission['exceptions']
                        return submission

                    images_list.append(tempfile.NamedTemporaryFile())
                    command = ['tiff2pdf', '-o', images_list[-1].name, '-F', images_list[-2].name]

                    run_result_pdf = subprocess.run(
                        command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
                    )

                    if run_result_pdf.stderr:
                        # submission['exceptions']
                        return submission

                    with open(images_list[-1].name, 'rb') as f:
                        blob_file = proxy.store_blob(
                            BlobCreateParams(
                                name='document_{}_redacted_pdf'.format(document['id']),
                                content=f.read(),
                            )
                        )

                    document['redacted_pdf'] = '/api/block_storage/{}/download'.format(
                        blob_file['uuid']
                    )
        finally:
            for image_file in images_list:
                image_file.close()
        return submission

    redact_pdf = CodeBlock(
        reference_name='redact_pdf',
        code=_redact_pdf,
        code_input={'submission': load_submission.output()},
        title='Redact PDF',
        description='Redact a PDF based on field configuration',
    )

    def _mark_as_completed(submission: Any) -> Any:
        from datetime import datetime

        dt_completed = datetime.isoformat(datetime.utcnow())
        dt_completed_fmt = dt_completed + 'Z'

        for document in submission['documents']:
            document['state'] = 'complete'
            document['complete_time'] = dt_completed_fmt

            for page in document['pages']:
                page['state'] = 'complete'

        for page in submission['unassigned_pages']:
            page['state'] = 'complete'

        submission['state'] = 'complete'
        submission['complete_time'] = dt_completed_fmt

        return submission

    # Custom code block enables users to transform and validate extracted submission data
    # before Hyperscience sends it to downstream systems
    # In this example, user created a _mark_as_completed function to transform and validate
    # submission data
    # Notice that the _mark_as_completed function takes in a single argument which is passed
    # in using the code_input parameter
    mark_as_complete = CodeBlock(
        code=_mark_as_completed,
        code_input={'submission': redact_pdf.output()},
        title='Mark As Completed',
        description='Updated Transformed JSON to Completed State',
    )

    # Submission complete block finalizes submission processing and updates reporting data
    # Every flow needs a complete block because it initiates Quality Assurance tasks and
    # changes the submission's status to "Complete"
    # In this example, submission complete block receives the submission object from
    # marked_as_complete custom code block
    submission_complete = SubmissionCompleteBlock(
        payload=mark_as_complete.output(), submission=mark_as_complete.output()
    )

    # Output block allows users to send data extracted by this idp flow to other systems
    # for downstream processing
    # In this example, this is an empty output block that does not do anything by default
    outputs = IDPOutputsBlock(inputs={'submission': submission_bootstrap.output('submission')})

    return Flow(
        # Flows should have a deterministic UUID ensuring cross-system consistency
        uuid=UUID('9174f4b5-2809-4a09-a25c-18b870b94945'),
        owner_email='flows.sdk@hyperscience.com',
        title='PDF Redaction (Field Based)',
        # Flow identifiers are globally unique
        manifest=IDPManifest(flow_identifier='PDF_REDACTION_FIELD_BASED'),
        # It is important to include all blocks that are instantiated here in the blocks
        # field and make sure they follow the order of the flow. For example, if machine
        # classification depends on the output of case collation, then case_collation must
        # come before machine_classification in this blocks array
        blocks=[
            submission_bootstrap,
            case_collation,
            machine_classification,
            manual_classification,
            machine_identification,
            manual_identification,
            machine_transcription,
            manual_transcription,
            flexible_extraction,
            load_submission,
            redact_pdf,
            mark_as_complete,
            submission_complete,
            outputs,
        ],
        input=get_idp_wf_inputs(idp_wf_config),
        description='PDF Redaction based on Field Configuration',
        triggers=IDPTriggers(),
        output={'submission': submission_complete.output()},
    )


if __name__ == '__main__':
    export_flow(flow=entry_point_flow())


IDP Full Page Transcription

This example transcribes text from documents.

Download idp_fpt_flow_v32.zip

IDP Full Page Transcription flow
from uuid import UUID

from flows_sdk.flows import Flow, Manifest
from flows_sdk.implementations.idp_v32.idp_blocks import (
    IDPFullPageTranscriptionBlock,
    IDPImageCorrectionBlock,
    IDPOutputsBlock,
    SubmissionBootstrapBlock,
    SubmissionCompleteBlock,
)
from flows_sdk.implementations.idp_v32.idp_values import IDPTriggers
from flows_sdk.package_utils import export_flow


def entry_point_flow() -> Flow:
    return idp_fpt_workflow()


def idp_fpt_workflow() -> Flow:
    # The idp flow basically processes, modifies and propagates the submission object from
    # block to block
    # Each block's processing result is usually included in the submission object

    # Submission bootstrap block initializes the submission object and prepares external images
    # or other submission data if needed
    submission_bootstrap = SubmissionBootstrapBlock(reference_name='submission_bootstrap')

    image_correction = IDPImageCorrectionBlock(
        reference_name='image_correction', submission=submission_bootstrap.output('submission')
    )

    full_page_transcription = IDPFullPageTranscriptionBlock(
        reference_name='full_page_transcription', submission=image_correction.output('submission')
    )

    # Submission complete block finalizes submission processing and updates reporting data
    # Every flow needs a complete block because it initiates Quality Assurance tasks and
    # changes the submission's status to "Complete"
    # In this example, submission complete block receives the submission object from
    # marked_as_complete custom code block
    submission_complete = SubmissionCompleteBlock(
        reference_name='complete_submission',
        payload=full_page_transcription.output('submission'),
        submission=full_page_transcription.output('submission'),
        nlc_qa_sampling_ratio=0,
        field_id_qa_enabled=False,
        field_id_qa_sampling_ratio=0,
        table_id_qa_enabled=False,
        table_id_qa_sampling_ratio=0,
        transcription_qa_enabled=False,
        transcription_qa_sampling_ratio=0,
        table_cell_transcription_qa_enabled=False,
        table_cell_transcription_qa_sample_rate=0,
    )

    # Output block allows users to send data extracted by this idp flow to other systems
    # for downstream processing
    # In this example, no output block is instantiated (blocks=[])
    # Setting up output blocks via UI and leaving this empty is recommended
    outputs = IDPOutputsBlock(
        inputs={'submission': submission_bootstrap.output('submission')}, blocks=[]
    )

    # Trigger block allows users to send data to idp flow via sources other than the User Interface
    # In this example, no trigger block is instantiated (blocks=[])
    # Setting up trigger blocks via UI and leaving this empty is recommended
    triggers = IDPTriggers(blocks=[])

    return Flow(
        # Flows should have a deterministic UUID ensuring cross-system consistency
        uuid=UUID('0dd837ae-44da-425a-b4be-9ffa3fc40eab'),
        owner_email='flows.sdk@hyperscience.com',
        title='IDP Full Page Transcription Flow Example (V32)',
        # Flow identifiers are globally unique
        manifest=Manifest(identifier='IDP_FULL_PAGE_TRANSCRIPTION_V32_FLOW_EXAMPLE', input=[]),
        triggers=triggers,
        # It is important to include all blocks that are instantiated here in the blocks
        # field and make sure they follow the order of the flow. For example, if machine
        # classification depends on the output of case collation, then case_collation must
        # come before machine_classification in this blocks array
        blocks=[
            submission_bootstrap,
            image_correction,
            full_page_transcription,
            submission_complete,
            outputs,
        ],
        description='IDP Full Page Transcription Flow Example (V32)',
        output={'submission': submission_complete.output()},
    )


if __name__ == '__main__':
    export_flow(flow=entry_point_flow())


IDP Custom Supervision V32

This example sends all fields in the submission to Custom Supervision.

Download idp_custom_supervision_v32.zip

IDP Custom Supervision V32
from typing import Any
from uuid import UUID

from flows_sdk.blocks import CodeBlock
from flows_sdk.flows import Flow
from flows_sdk.implementations.idp_v32.idp_blocks import (
    CaseCollationBlock,
    IdpCustomSupervisionBlock,
    IDPOutputsBlock,
    MachineClassificationBlock,
    MachineIdentificationBlock,
    MachineTranscriptionBlock,
    ManualClassificationBlock,
    ManualIdentificationBlock,
    ManualTranscriptionBlock,
    SubmissionBootstrapBlock,
    SubmissionCompleteBlock,
)
from flows_sdk.implementations.idp_v32.idp_values import (
    IDPManifest,
    IDPTriggers,
    get_idp_wf_config,
    get_idp_wf_inputs,
)
from flows_sdk.package_utils import export_flow


def entry_point_flow() -> Flow:
    return idp_workflow()


def idp_workflow() -> Flow:
    idp_wf_config = get_idp_wf_config()

    # The idp flow basically processes, modifies and propagates the submission object from
    # block to block
    # Each block's processing result is usually included in the submission object

    # Submission bootstrap block initializes the submission object and prepares external images
    # or other submission data if needed
    submission_bootstrap = SubmissionBootstrapBlock(reference_name='submission_bootstrap')

    # Case collation block groups files, documents and pages (from the submission) into cases
    # In this example, case collation block receives the submission object and the cases
    # information from submission bootstrap block
    case_collation = CaseCollationBlock(
        reference_name='machine_collation',
        submission=submission_bootstrap.output('submission'),
        cases=submission_bootstrap.output('api_params.cases'),
    )

    # Machine classification block automatically matches documents to structured, semi-structured
    # or additional layouts
    # In this example, machine classification block receives the submission object from
    # case collation block
    machine_classification = MachineClassificationBlock(
        reference_name='machine_classification',
        submission=case_collation.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Manual classification block allows keyers to manually match submissions to their layouts.
    # Keyers may perform manual classification if machine classification cannot automatically
    # match a submission to a layout with high confidence
    # In this example, manual classification block receives the submission object from machine
    # classification block
    manual_classification = ManualClassificationBlock(
        reference_name='manual_classification',
        submission=machine_classification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Machine identification automatically identify fields and tables in the submission
    # In this example, machine identification block receives the submission object from manual
    # classification
    machine_identification = MachineIdentificationBlock(
        reference_name='machine_identification',
        submission=manual_classification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Manual identification allows keyers to complete field identification or table identification
    # tasks, where they draw bounding boxes around the contents of certain fields, table columns
    # or table rows. This identification process ensures that the system will be able to
    # transcribe the correct content in the upcoming transcription process
    # In this example, manual identification block receives the submission object from machine
    # identification
    manual_identification = ManualIdentificationBlock(
        reference_name='manual_identification',
        submission=machine_identification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Machine transcription automatically transcribes the content of your submission
    # In this example, machine identification block receives the submission object from manual
    # identification
    machine_transcription = MachineTranscriptionBlock(
        reference_name='machine_transcription',
        submission=manual_identification.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Manual transcription lets your keyers manually enter the text found in fields or tables
    # that could not be automatically transcribed
    # In this example, manual transcription block receives the submission object from machine
    # transcription block
    manual_transcription = ManualTranscriptionBlock(
        reference_name='manual_transcription',
        submission=machine_transcription.output('submission'),
        api_params=submission_bootstrap.output('api_params'),
        # api_params is some submission processing settings obtained from submission bootstrap
        # that users do not have to worry about
    )

    # Here we define a function for a custom code block that gets all fields from a submission
    # to display them in custom supervision. It makes sure they appear only when an appropriate
    # layout has been selected by using the dependencies settings.
    def _get_fields_for_custom_sv_fn(submission: Any) -> Any:
        template_fields = []
        visited = set()
        for doc in submission.get('documents', []):
            fields = sorted(doc.get('document_fields', []), key=lambda f: f['id'])
            for field in fields:
                if field['layout_field_uuid'] in visited:
                    continue
                visited.add(field['layout_field_uuid'])
                template_fields.append(
                    {
                        'name': f"{field['field_name']}_{field['layout_field_uuid']}",
                        'type': 'transcription',
                        'layout_field_id': field['layout_field_uuid'],
                        'ui': {'hidden': True},
                        'dependencies': [
                            {
                                'condition': {
                                    'properties': {'layoutId': {'const': doc['layout_uuid']}}
                                },
                                'override': {'ui': {'hidden': False}},
                            }
                        ],
                    }
                )
        return template_fields

    # Create our Custom Code block using our function from above.
    get_fields_for_custom_sv = CodeBlock(
        reference_name='get_fields_for_custom_sv',
        code=_get_fields_for_custom_sv_fn,
        code_input={'submission': manual_transcription.output('submission')},
        title='Get All Fields',
        description='Find all fields present in this submission to include in Custom Supervision',
    )

    # Using the output of our Custom Code block, we can construct our three column template
    # to define our Custom Supervision task.
    format_sv_template = CodeBlock(
        reference_name='format_sv_template',
        code=lambda fields: [
            {
                'name': 'three_column_template',
                'version': '1.0',
                'thumbnail': {'group_by_document': True, 'group_by_case': True},
                'action': [
                    {
                        'name': 'Transcribe Fields',
                        'display': 'All Identified Fields',
                        'input': fields,
                    },
                    {
                        'name': 'Decision and Case',
                        'display': 'Make Decision - Add Case',
                        'input': [
                            {
                                'name': 'document_decision',
                                'type': 'decision',
                                'title': 'Document Decision',
                                'relation': {'type': 'document'},
                                'schema': {
                                    'oneOf': [
                                        {'const': 'accept', 'title': 'Accept Document'},
                                        {'const': 'reject', 'title': 'Reject Document'},
                                    ]
                                },
                            },
                            {'name': 'assign_to_case', 'type': 'case_dropdown', 'title': ''},
                        ],
                    },
                ],
            }
        ],
        code_input={'fields': get_fields_for_custom_sv.output()},
    )

    # Create our Custom Supervision task using the template we constructed via Custom Code blocks.
    # This task will have all fields from the submission so that the user can edit them, as well
    # as a document level decision ("accept" or "reject" the document). It will also have the
    # option to add a document to a case.
    idp_custom_supervision = IdpCustomSupervisionBlock(
        reference_name='idp_custom_supervision',
        submission=manual_transcription.output('submission'),
        task_purpose='example_custom_supervision',
        supervision_template=format_sv_template.output(),
    )

    # Case collation block groups files, documents and pages (from the submission) into cases
    # In this example, case collation block receives the submission object and the cases
    # information from the custom supervision block. This block is required after Custom
    # Supervision in order to propagate case ID changes to the IDP database.
    custom_supervision_collation = CaseCollationBlock(
        reference_name='custom_supervision_collation',
        submission=idp_custom_supervision.output('submission'),
        cases=idp_custom_supervision.output('cases'),
    )

    # Submission complete block finalizes submission processing and updates reporting data
    # Every flow needs a complete block because it initiates Quality Assurance tasks and
    # changes the submission's status to "Complete"
    # In this example, submission complete block receives the submission object from
    # marked_as_complete custom code block
    submission_complete = SubmissionCompleteBlock(
        reference_name='complete_submission',
        submission=custom_supervision_collation.output('submission'),
    )

    # Output block allows users to send data extracted by this idp flow to other systems
    # for downstream processing
    # In this example, no output block is instantiated (blocks=[])
    # Setting up output blocks via UI and leaving this empty is recommended
    outputs = IDPOutputsBlock(
        inputs={'submission': submission_bootstrap.output('submission')}, blocks=[]
    )

    # Trigger block allows users to send data to idp flow via sources other than the User Interface
    # In this example, no trigger block is instantiated (blocks=[])
    # Setting up trigger blocks via UI and leaving this empty is recommended
    triggers = IDPTriggers(blocks=[])

    return Flow(
        # Flows should have a deterministic UUID ensuring cross-system consistency
        uuid=UUID('1b2c9a46-1701-40f6-93ba-6bec910638cd'),
        owner_email='flows.sdk@hyperscience.com',
        title='IDP Custom Supervision Block Flow Example (V32)',
        # Flow identifiers are globally unique
        manifest=IDPManifest(flow_identifier='IDP_CUSTOM_SUPERVISION_V32_EXAMPLE'),
        triggers=triggers,
        # It is important to include all blocks that are instantiated here in the blocks
        # field and make sure they follow the order of the flow. For example, if machine
        # classification depends on the output of case collation, then case_collation must
        # come before machine_classification in this blocks array
        blocks=[
            submission_bootstrap,
            case_collation,
            machine_classification,
            manual_classification,
            machine_identification,
            manual_identification,
            machine_transcription,
            manual_transcription,
            get_fields_for_custom_sv,
            format_sv_template,
            idp_custom_supervision,
            custom_supervision_collation,
            submission_complete,
            outputs,
        ],
        input=get_idp_wf_inputs(idp_wf_config),
        description='IDP Custom Supervision - All Fields (V32)',
        output={'submission': submission_complete.output()},
    )


if __name__ == '__main__':
    export_flow(flow=entry_point_flow())


Code Snippets

Note

While code snippets are great, the Hyperscience Platform provides easy tools to help you build your own solutions. Make sure to read our Testing & Debugging guide below before diving in.

How do I iterate over multiple documents in a submission?

Block subclasses provided in IDP Library generally take a Submission object as a mandatory input parameter as well as output a modified copy of the Submission object. Each block provides a convenience method to access the Submission object:

# This example shows the output of MachineTranscriptionBlock (i.e. machine_transcription) being used as the input to a ManualTranscriptionBlock
manual_transcription = ManualTranscriptionBlock(
  reference_name='manual_transcription',
  submission=machine_transcription.output('submission'),
  ...
)

Warning

Always check the exact output of the preceding block! Some block output includes additional metadata and may require a different key, such as .output(‘results.submission’), to access the Submission object.

The Submission object is constructed as:

{
   "submission": {
      "id": "12345",
      "documents": [
         {
             "id": "1",
             "pages": [...] // list of pages
             "document_fields": [...] // list of document fields
         },
         ...
      ]
      "cases": [],
   }
}

And so iterating over multiple documents in a submission is as easy as:

def _iter_docs(submission):
   for document in submission.get('documents', []):
      print(document["id"]) # Print the ID of each document in the submission
   return submission

custom_code = CodeBlock(
   reference_name='code_block',
   code=_iter_docs,
   code_input={'submission': manual_transcription.output('submission')}
)

How do I compare fields across different documents?

Let’s assume we’re trying to find the maximum value of the “income” field across multiple documents in the same submission. Once transcribed, we iterate over all the transcribed fields in the submission and compare each value to the previous maximum:

def _compare_fields(submission):
   max_income = 0.0
   for document in submission.get('documents', []):
      for field in document.get('document_fields', []):
         if field.get('field_name') == 'income' and float(field.get('transcription')) > max_income:
            max_income = float(field.get('transcription'))

      print(max_income) # Print the maximum income across all documents
   return submission

custom_code = CodeBlock(
   reference_name='code_block',
   code=_compare_fields,
   code_input={'submission': previous_block.output('submission')}
)

How do I store binary runtime data in the system?

Available in v35 and later. A code block may need to store some binary data such as images in order to transfer it to another block in the flow. To facilitate this code blocks provide the _hs_block_instance system argument that exposes the store_blob and fetch_blob methods (for more info see flows_sdk.types.HsBlockInstance). store_blob will store the passed in binary data and return an object that contains an identifier that can later be used to retrieve it using fetch_blob. This API can be used to store images and any type of data serialized into binary format, for example large JSONs or other text-based documents. See example code below:

from flows_sdk.types import HsBlockInstance
from flows_sdk.utils import workflow_input

def _transform_image(image_reference: str, _hs_block_instance: HsBlockInstance):
    from flows_sdk.types import StoreBlobRequest

    original_image = _hs_block_instance.fetch_blob(image_reference)

    transformed_image = transform_image_lib_function(original_image.content)

    stored_blob = _hs_block_instance.store_blob(
            StoreBlobRequest(name=f'transformed-{image_reference}', content=transformed_image)
        )
    # the returned StoreBlobResponse has name and uuid parameters
    return stored_blob.uuid

transform_image = CodeBlock(
   reference_name='transform_image',
   code=_transform_image,
   code_input={'image_reference': workflow_input('input_image_uuid')}
)

def _use_transformed_image(blob_reference: str, _hs_block_instance: HsBlockInstance) -> Any:
    transformed_image = _hs_block_instance.fetch_blob(blob_reference)
    return len(transformed_image.content)

use_transformed_image = CodeBlock(
    reference_name='use_transformed_image',
    code=_use_transformed_image,
    code_input={'blob_reference': transform_image.output()}
)

Note

System arguments don’t need to be present in code_input, just the function definition. They will be automatically populated by the block at runtime.

How do I access flow metadata inside of a code block?

Available in v35 and later. Each flow run is associated with its unique execution context in the system that is described by certain metadata. Code blocks provide access to this metadata through the _hs_task system argument (for more info see flows_sdk.types.HsTask). Example usage:

from flows_sdk.types import HsTask

def use_flow_metadata(in_data: dict, _hs_task: HsTask):
    return f"{_hs_task.correlation_id}-{_hs_task.flow_run_id}-{in_data['value']}"

custom_code = CodeBlock(
   reference_name='use_flow_metadata',
   code=use_flow_metadata,
   code_input={'in_data': another_block.output('data')}
)

Note

System arguments don’t need to be present in code_input, just the function definition. They will be automatically populated by the block at runtime.

How do I log from a code block?

Available in v35 and later. By default, messages from a generic logger / print statements are visible in the application container. However, they do not appear in the UI, nor do they contain the details that are typically included in log message (structured logging - e.g., ID of the submission being processed).

To add these features to your messages, you can use the _hs_block_instance parameter, which provides the log method (for more details, see flows_sdk.types.HsBlockInstance.log)

from flows_sdk.types import HsBlockInstance
from flows_sdk.utils import workflow_input

# we need a _hs_block_instance to call the logging methods
def log_text(text: str, _hs_block_instance: HsBlockInstance) -> None:
    _hs_block_instance.log(f'INFO level: {text}')  # by default, logs have the INFO level.
    # raise Exception('something went wrong')  # this will log an exception if uncommented

log_task_ccb = CodeBlock(
    reference_name='log_task_ccb',
    code=log_text,
    code_input={FlowInputs.TEXT_TO_LOG: workflow_input(FlowInputs.TEXT_TO_LOG)},
)

You can also download this example flow that showcases logging.

Logging flow
import sys
from uuid import UUID

from flows_sdk.blocks import CodeBlock, PythonBlock
from flows_sdk.flows import Flow, Manifest, Parameter
from flows_sdk.package_utils import export_flow
from flows_sdk.types import HsBlockInstance
from flows_sdk.utils import workflow_input

LOGGING_FLOW_IDENTIFIER = 'LOGGING_FLOW'
LOGGING_FLOW_UUID = UUID('cee432a8-30a3-4d07-a924-e6d87c923325')


def entry_point_flow() -> Flow:
    return sample_logging_flow()


class FlowInputs:
    TEXT_TO_LOG = 'text'


def sample_logging_flow() -> Flow:
    # we need a _hs_block_instance to call the logging methods
    def log_text(text: str, _hs_block_instance: HsBlockInstance) -> None:
        _hs_block_instance.log(f'DEBUG level: {text}', HsBlockInstance.LogLevel.DEBUG)
        _hs_block_instance.log(f'INFO level: {text}', HsBlockInstance.LogLevel.INFO)
        _hs_block_instance.log(f'WARNING level: {text}', HsBlockInstance.LogLevel.WARN)
        _hs_block_instance.log(f'ERROR level: {text}', HsBlockInstance.LogLevel.ERROR)
        # raise Exception('something went wrong')  # this will log an exception if uncommented

    log_task_ccb = CodeBlock(
        reference_name='log_task_ccb',
        code=log_text,
        code_input={FlowInputs.TEXT_TO_LOG: workflow_input(FlowInputs.TEXT_TO_LOG)},
    )
    log_task_python = PythonBlock(
        reference_name='log_task_python',
        code=log_text,
        code_input={FlowInputs.TEXT_TO_LOG: workflow_input(FlowInputs.TEXT_TO_LOG)},
    )

    return Flow(
        depedencies={},
        title='Logging sample flow',
        description='A simple Flow showcasing how to log in code blocks',
        blocks=[log_task_ccb, log_task_python],
        owner_email='flows.sdk@hyperscience.com',
        manifest=Manifest(
            identifier=LOGGING_FLOW_IDENTIFIER,
            input=[
                (
                    Parameter(
                        name=FlowInputs.TEXT_TO_LOG,
                        title='Text to log',
                        type='string',
                        optional=False,
                    )
                )
            ],
        ),
        uuid=LOGGING_FLOW_UUID,
        input={FlowInputs.TEXT_TO_LOG: 'default-text'},
    )


if __name__ == '__main__':
    export_filename = None
    if len(sys.argv) > 1:
        export_filename = sys.argv[1]

    export_flow(flow=entry_point_flow(), filename=export_filename)