tracker#
See:
- class aws_textract_pipeline.tracker.ComponentToTextractOutputResult(is_single_textract_api_call: bool, job_id: Optional[str], job_id_list: Optional[List[str]])[source]#
The returned object for creating textract output for all components of a document. This information will be used to parse the textract output data later.
- Parameters:
is_single_textract_api_call – it is more efficient to use single textract API call instead multiple API calls on each component. we try to use single API if the document fit the quota. Otherwise, we split and make multiple API calls.
job_id – the textract job id, only available if we only made one API call.
job_id_list – the textract job id for each component, only available if we made multiple API calls.
- class aws_textract_pipeline.tracker.TextractOutputToTextAndJsonResult(text_list: List[str] = <factory>, json_list: List[dict] = <factory>)[source]#
- class aws_textract_pipeline.tracker.Data(landing_uri: str, doc_type: str, features: ~typing.List[str] = <factory>, components: ~typing.List[~aws_textract_pipeline.tracker.Component] = <factory>, component_to_textract_output_result: ~typing.Optional[~aws_textract_pipeline.tracker.ComponentToTextractOutputResult] = None)[source]#
Additional data about this document.
- Parameters:
landing_uri – where is the original s3 object in landing. This is because given a landing file, we can easily calculate the doc id, but cannot do it reversely. so we have to store this value and attach to s3 objects in sub-sequence logics.
doc_type – the document type.
components –
component_to_textract_output_result –
- property n_components#
Number of components.
- class aws_textract_pipeline.tracker.Errors(error: Optional[str] = None, traceback: Optional[str] = None)[source]#
Runtime error information for debug.
- Parameters:
error – error message.
traceback – Python traceback information.
- class aws_textract_pipeline.tracker.MoveToNextStepResult(step: str, components: List[aws_textract_pipeline.tracker.Component] = <factory>, component_to_textract_output_result: Optional[aws_textract_pipeline.tracker.ComponentToTextractOutputResult] = None, textract_output_to_text_and_json_result: Optional[aws_textract_pipeline.tracker.TextractOutputToTextAndJsonResult] = None)[source]#
- class aws_textract_pipeline.tracker.BaseStatusAndUpdateTimeIndex[source]#
Status Tracker GSI index, to allow lookup by status.
- class aws_textract_pipeline.tracker.BaseTracker(hash_key: Optional[Any] = None, range_key: Optional[Any] = None, **attributes)[source]#
Status tracker DynamoDB table ORM model. It is the main class of the
aws_textract_pipelinelibrary. All the ETL logics are implemented as its methods.Main ETL Logics:
Status tracking management:
Usage example:
import aws_textract_pipeline.api as aws_textract_pipeline class StatusAndUpdateTimeIndex(aws_textract_pipeline.BaseStatusAndUpdateTimeIndex): pass class Tracker(aws_textract_pipeline.BaseTracker): class Meta: table_name = "aws_textract_pipeline-tracker" region = bsm.aws_region billing_mode = pm.PAY_PER_REQUEST_BILLING_MODE status_and_update_time_index = StatusAndUpdateTimeIndex() # (optional) override default settings JOB_ID = "your_own_project_name" STATUS_ZERO_PAD = 6 # status code will be padded to 6 digits MAX_RETRY = 3 # for each task, you can retry 3 times LOCK_EXPIRE_SECONDS = 900 # lock will expire in 900 seconds DEFAULT_STATUS = StatusEnum.s01000_landing_to_raw_pending.value # default status at very beginning of this pipeline STATUS_ENUM = StatusEnum # you can extend the status enum if you want to add more status code and more ETL steps
You can find a more detailed example at https://github.com/MacHu-GWU/aws_textract_pipeline-project/blob/main/debug/test_pipeline.py
This implementation is based on the pynamodb_mate Status Tracker framework.
- STATUS_ENUM#
alias of
StatusEnum
- start_component_to_textract_output(debug: bool = False)[source]#
Transition from “component” to “textract output”.
- start_textract_output_to_text_and_json(debug: bool = False)[source]#
Transition from “textract output” to “text and json”.
- start_json_to_extracted_data(debug: bool = False)[source]#
Transition from “json” to “extracted data”.
- start_extracted_data_to_hil_output(debug: bool = False)[source]#
Transition from “extracted data” to “hil output”.
- start_hil_output_to_hil_post_process(debug: bool = False)[source]#
Transition from “hil output” to “hil post process”.
- check_status_range(valid_status: List[int])[source]#
Check the current status before executing ETL logics. Raise error if the current status doesn’t meet expectation. For example, in order to segment the raw document into components, the raw document has to be successfully copied from landing. If the status code is not the following, we should not execute raw to components logic:
StatusEnum.s01060_landing_to_raw_succeeded: we are ready .StatusEnum.s02000_raw_to_component_pending: we are ready.StatusEnum.s02040_raw_to_component_failed: we have tried, but failed,we are ready for the next try.
- Parameters:
valid_status – list of valid status.
- classmethod new_from_landing_doc(bsm: BotoSesManager, landing_doc: LandingDocument)[source]#
Create a new tracker item in DynamoDB based on the document in landing bucket. During the creation of the tracker item, we calculate the doc_id based on the content of the document in landing bucket.
- Parameters:
bsm –
boto_session_manager.BotoSesManagerobject.landing_doc –
aws_textract_pipeline.landing.LandingDocumentobject.
- landing_to_raw(bsm: BotoSesManager, workspace: Workspace, debug: bool = False)[source]#
Wrapper of the
BaseTracker._landing_to_raw()method.
- raw_to_component(bsm: BotoSesManager, workspace: Workspace, tmp_dir: Union[str, Path, Path] = PosixPath('/tmp'), clear_tmp_dir: bool = True, debug: bool = False) List[Component][source]#
Wrapper of the
BaseTracker._raw_to_component()method.
- component_to_textract_output(bsm: BotoSesManager, workspace: Workspace, single_api_call: Optional[bool] = None, use_table_feature: bool = False, use_form_feature: bool = False, use_query_feature: bool = False, use_signature_feature: bool = False, use_layout_feature: bool = False, sns_topic_arn: Optional[str] = None, role_arn: Optional[str] = None, debug: bool = False) ComponentToTextractOutputResult[source]#
Run textract analysis document API for each component.
Wrapper of the
BaseTracker._component_to_textract_output()method.- Parameters:
bsm –
boto_session_manager.BotoSesManagerobject.workspace –
aws_textract_pipeline.workspace.Workspaceobject.single_api_call – if None, the library will automatically decide whether to use single API call or multiple API calls based on the document size and number of components. If True, only one API call will be made for the whole document. If False, multiple API calls will be made for each component.
use_table_feature – at least one feature must be enabled.
use_form_feature – at least one feature must be enabled.
use_query_feature – at least one feature must be enabled.
use_signature_feature – at least one feature must be enabled.
use_layout_feature – at least one feature must be enabled.
sns_topic_arn – AWS SNS topic arn if you want to send a notification when the job is done.
role_arn – the role arn that allows Amazon Textract to publish to the SNS topic.
debug –
- textract_output_to_text_and_json(bsm: BotoSesManager, workspace: Workspace, debug: bool = False) TextractOutputToTextAndJsonResult[source]#
Parse textract output data, and convert them into text and json view.
Wrapper of the
BaseTracker._textract_output_to_text_and_json()method.- Parameters:
bsm –
boto_session_manager.BotoSesManagerobject.workspace –
aws_textract_pipeline.workspace.Workspaceobject.debug –
- get_next_step() StepEnum[source]#
Identify the next step of the pipeline based on the current status.
- move_to_next_stage(bsm: BotoSesManager, workspace: Workspace, tmp_dir: Union[str, Path, Path] = PosixPath('/tmp'), clear_tmp_dir: bool = True, single_api_call: Optional[bool] = None, use_table_feature: bool = False, use_form_feature: bool = False, use_query_feature: bool = False, use_signature_feature: bool = False, use_layout_feature: bool = False, sns_topic_arn: Optional[str] = None, role_arn: Optional[str] = None, debug: bool = False)[source]#
Move the document to the next step of the pipeline. Smartly execute one of the following step: