tracker#

See:

class aws_textract_pipeline.tracker.Component(id: str)[source]#

Metadata for each component.

class aws_textract_pipeline.tracker.ComponentToTextractOutputResult(is_single_textract_api_call: bool, job_id: Optional[str], job_id_list: Optional[List[str]])[source]#

The returned object for creating textract output for all components of a document. This information will be used to parse the textract output data later.

Parameters:
  • is_single_textract_api_call – it is more efficient to use single textract API call instead multiple API calls on each component. we try to use single API if the document fit the quota. Otherwise, we split and make multiple API calls.

  • job_id – the textract job id, only available if we only made one API call.

  • job_id_list – the textract job id for each component, only available if we made multiple API calls.

wait_document_analysis_job_to_succeed(bsm: BotoSesManager, delays: int = 5, timeout: int = 60, verbose: bool = True)[source]#

Wait all Textract API call to succeed for this document.

class aws_textract_pipeline.tracker.TextractOutputToTextAndJsonResult(text_list: List[str] = <factory>, json_list: List[dict] = <factory>)[source]#
class aws_textract_pipeline.tracker.Data(landing_uri: str, doc_type: str, features: ~typing.List[str] = <factory>, components: ~typing.List[~aws_textract_pipeline.tracker.Component] = <factory>, component_to_textract_output_result: ~typing.Optional[~aws_textract_pipeline.tracker.ComponentToTextractOutputResult] = None)[source]#

Additional data about this document.

Parameters:
  • landing_uri – where is the original s3 object in landing. This is because given a landing file, we can easily calculate the doc id, but cannot do it reversely. so we have to store this value and attach to s3 objects in sub-sequence logics.

  • doc_type – the document type.

  • components

  • component_to_textract_output_result

property n_components#

Number of components.

class aws_textract_pipeline.tracker.Errors(error: Optional[str] = None, traceback: Optional[str] = None)[source]#

Runtime error information for debug.

Parameters:
  • error – error message.

  • traceback – Python traceback information.

class aws_textract_pipeline.tracker.StatusEnum(value)[source]#

Textract pipeline status enum.

class aws_textract_pipeline.tracker.StepEnum(value)[source]#

An enumeration.

class aws_textract_pipeline.tracker.MoveToNextStepResult(step: str, components: List[aws_textract_pipeline.tracker.Component] = <factory>, component_to_textract_output_result: Optional[aws_textract_pipeline.tracker.ComponentToTextractOutputResult] = None, textract_output_to_text_and_json_result: Optional[aws_textract_pipeline.tracker.TextractOutputToTextAndJsonResult] = None)[source]#
class aws_textract_pipeline.tracker.BaseStatusAndUpdateTimeIndex[source]#

Status Tracker GSI index, to allow lookup by status.

class aws_textract_pipeline.tracker.BaseTracker(hash_key: Optional[Any] = None, range_key: Optional[Any] = None, **attributes)[source]#

Status tracker DynamoDB table ORM model. It is the main class of the aws_textract_pipeline library. All the ETL logics are implemented as its methods.

Main ETL Logics:

Status tracking management:

Usage example:

import aws_textract_pipeline.api as aws_textract_pipeline

class StatusAndUpdateTimeIndex(aws_textract_pipeline.BaseStatusAndUpdateTimeIndex):
    pass

class Tracker(aws_textract_pipeline.BaseTracker):
    class Meta:
        table_name = "aws_textract_pipeline-tracker"
        region = bsm.aws_region
        billing_mode = pm.PAY_PER_REQUEST_BILLING_MODE

    status_and_update_time_index = StatusAndUpdateTimeIndex()

    # (optional) override default settings
    JOB_ID = "your_own_project_name"
    STATUS_ZERO_PAD = 6 # status code will be padded to 6 digits
    MAX_RETRY = 3 # for each task, you can retry 3 times
    LOCK_EXPIRE_SECONDS = 900 # lock will expire in 900 seconds
    DEFAULT_STATUS = StatusEnum.s01000_landing_to_raw_pending.value # default status at very beginning of this pipeline
    STATUS_ENUM = StatusEnum # you can extend the status enum if you want to add more status code and more ETL steps

You can find a more detailed example at https://github.com/MacHu-GWU/aws_textract_pipeline-project/blob/main/debug/test_pipeline.py

This implementation is based on the pynamodb_mate Status Tracker framework.

STATUS_ENUM#

alias of StatusEnum

start_landing_to_raw(debug: bool = False)[source]#

Transition from “landing” to “textract”.

start_raw_to_component(debug: bool = False)[source]#

Transition from “raw” to “component”.

start_component_to_textract_output(debug: bool = False)[source]#

Transition from “component” to “textract output”.

start_textract_output_to_text_and_json(debug: bool = False)[source]#

Transition from “textract output” to “text and json”.

start_json_to_extracted_data(debug: bool = False)[source]#

Transition from “json” to “extracted data”.

start_extracted_data_to_hil_output(debug: bool = False)[source]#

Transition from “extracted data” to “hil output”.

start_hil_output_to_hil_post_process(debug: bool = False)[source]#

Transition from “hil output” to “hil post process”.

check_status_range(valid_status: List[int])[source]#

Check the current status before executing ETL logics. Raise error if the current status doesn’t meet expectation. For example, in order to segment the raw document into components, the raw document has to be successfully copied from landing. If the status code is not the following, we should not execute raw to components logic:

  • StatusEnum.s01060_landing_to_raw_succeeded: we are ready .

  • StatusEnum.s02000_raw_to_component_pending: we are ready.

  • StatusEnum.s02040_raw_to_component_failed: we have tried, but failed,

    we are ready for the next try.

Parameters:

valid_status – list of valid status.

classmethod new_from_landing_doc(bsm: BotoSesManager, landing_doc: LandingDocument)[source]#

Create a new tracker item in DynamoDB based on the document in landing bucket. During the creation of the tracker item, we calculate the doc_id based on the content of the document in landing bucket.

Parameters:
landing_to_raw(bsm: BotoSesManager, workspace: Workspace, debug: bool = False)[source]#

Wrapper of the BaseTracker._landing_to_raw() method.

raw_to_component(bsm: BotoSesManager, workspace: Workspace, tmp_dir: Union[str, Path, Path] = PosixPath('/tmp'), clear_tmp_dir: bool = True, debug: bool = False) List[Component][source]#

Wrapper of the BaseTracker._raw_to_component() method.

component_to_textract_output(bsm: BotoSesManager, workspace: Workspace, single_api_call: Optional[bool] = None, use_table_feature: bool = False, use_form_feature: bool = False, use_query_feature: bool = False, use_signature_feature: bool = False, use_layout_feature: bool = False, sns_topic_arn: Optional[str] = None, role_arn: Optional[str] = None, debug: bool = False) ComponentToTextractOutputResult[source]#

Run textract analysis document API for each component.

Wrapper of the BaseTracker._component_to_textract_output() method.

Parameters:
  • bsmboto_session_manager.BotoSesManager object.

  • workspaceaws_textract_pipeline.workspace.Workspace object.

  • single_api_call – if None, the library will automatically decide whether to use single API call or multiple API calls based on the document size and number of components. If True, only one API call will be made for the whole document. If False, multiple API calls will be made for each component.

  • use_table_feature – at least one feature must be enabled.

  • use_form_feature – at least one feature must be enabled.

  • use_query_feature – at least one feature must be enabled.

  • use_signature_feature – at least one feature must be enabled.

  • use_layout_feature – at least one feature must be enabled.

  • sns_topic_arn – AWS SNS topic arn if you want to send a notification when the job is done.

  • role_arn – the role arn that allows Amazon Textract to publish to the SNS topic.

  • debug

textract_output_to_text_and_json(bsm: BotoSesManager, workspace: Workspace, debug: bool = False) TextractOutputToTextAndJsonResult[source]#

Parse textract output data, and convert them into text and json view.

Wrapper of the BaseTracker._textract_output_to_text_and_json() method.

Parameters:
get_next_step() StepEnum[source]#

Identify the next step of the pipeline based on the current status.

move_to_next_stage(bsm: BotoSesManager, workspace: Workspace, tmp_dir: Union[str, Path, Path] = PosixPath('/tmp'), clear_tmp_dir: bool = True, single_api_call: Optional[bool] = None, use_table_feature: bool = False, use_form_feature: bool = False, use_query_feature: bool = False, use_signature_feature: bool = False, use_layout_feature: bool = False, sns_topic_arn: Optional[str] = None, role_arn: Optional[str] = None, debug: bool = False)[source]#

Move the document to the next step of the pipeline. Smartly execute one of the following step:

exception DoesNotExist(msg: Optional[str] = None, cause: Optional[Exception] = None)#