API Documentation

Kodexa is a Python framework to enable flexible data engineering with semi-structured and unstructured documents and data.

Kodexa Cloud

Provides out of the box integration with the Kodexa cloud, enabling the universe of content services that are available

class KodexaCloudPipeline(slug, version=None, attach_source=True, options=None, auth=None, cloud_url='https://cloud.kodexa.com', access_token=None)[source]

Allow you to interact with a pipeline that has been deployed in the Kodexa Cloud

class KodexaCloudService(slug, version=None, attach_source=False, options={}, auth=[], cloud_url='https://cloud.kodexa.com', access_token=None)[source]

Allows you to interact with a content service that has been deployed in the Kodexa cloud

Connectors

Connectors provide a way to access document (files or otherwise) from a source, and they form the starting point for Pipelines

class FolderConnector(path, file_filter='*')[source]
class UrlConnector(url, headers=None)[source]

Core Model

The core model provides the object structure for Documents, ContentNodes and Features which is used as the foundation for working with unstructured data in the framework.

Create a new instance of a Document, you will be required to provide a DocumentMetadata object

>>> document = Document(DocumentMetadata())
class Document(metadata=None, content_node: kodexa.model.model.ContentNode = None, source=<kodexa.model.model.SourceMetadata object>)[source]

A Document is a collection of metadata and a set of content nodes.

add_mixin(mixin)[source]

Add the given mixin to this document, this will apply the mixin to all the content nodes, and also register it with the document so that future invocations of create_node will ensure the node has the mixin appled.

>>> document.add_mixin('spatial')
create_node(type: str = <class 'type'>, content: str = None, virtual: bool = False, parent: kodexa.model.model.ContentNode = None, index: int = 0)[source]

Creates a new node for the document, this doesn’t add the node to the document however it does ensure that any mixins that have been applied to the document will also be available on the new node

>>> document.create_node(type='page')
<kodexa.model.model.ContentNode object at 0x7f80605e53c8>
static from_dict(doc_dict)[source]

Build a new document from a dictionary

>>> Document.from_dict(doc_dict)
static from_json(json_string)[source]

From a JSON string create an instance of a Document

>>> document.from_json()
static from_kdxa(file_path)[source]

Read an .kdxa file from the given file_path and

>>> document = Document.from_kdxa('my-document.kdxa')
Parameters:file_path – the path to the mdoc file
static from_msgpack(bytes)[source]

From a message pack byte array create an instance of a Document

>>> document.from_msgpack()
get_mixins()[source]

Get the list of mixins that have been enabled on this document.

>>> document.get_mixins()
['spatial','finders']
get_root()[source]

Get the root content node for the document (same as content_node)

>>> node = document.get_node()
to_dict()[source]

Convert this document object structure into a simple set of dictionaries

>>> document.to_dict()
to_json()[source]

Convert this document object structure into a JSON object

>>> document.to_json()
to_kdxa(file_path)[source]

Write the document to the kdxa format (msgpack) which can be used with the Kodexa platform

>>> document.to_mdoc('my-document.kdxa')
Parameters:file_path – the path to the mdoc you wish to create
to_msgpack()[source]

Convert this document object structure into a message pack

>>> document.to_msgpack()
to_text()[source]

Convert this document object structure into a text representation, which can be useful when trying to review the structure.

>>> document.to_text()
class DocumentMetadata(*args, **kwargs)[source]

A flexible dict based approach to capturing metadata for the document

class ContentNode(document, type, content='', content_parts=[])[source]

A Content Node identifies a section of the document containing logical grouping of information

The node will have content and can include n number of features.

You should always create a node using the Document create_node method to ensure that the correct mixins are applied.

>>> new_page = document.create_node(type='page')
<kodexa.model.model.ContentNode object at 0x7f80605e53c8>
>>> current_content_node.add_child(new_page)

or

>>> new_page = document.create_node(type='page', content='This is page 1')
<kodexa.model.model.ContentNode object at 0x7f80605e53c8>
>>> current_content_node.add_child(new_page)
add_child(child, index=None)[source]

Add a ContentNode as a child of this ContentNode

>>> new_page = document.create_node(type='page')
<kodexa.model.model.ContentNode object at 0x7f80605e53c8>
>>> current_content_node.add_child(new_page)
add_feature(feature_type, name, value, single=True, serialized=False)[source]

Add a new feature to this ContentNode.

You will need to provide the feature type, the name of the feature and then the value.

Note this will add a value to an existing feature, therefore the feature value might switch to being a list

>>> new_page = document.create_node(type='page')
<kodexa.model.model.ContentNode object at 0x7f80605e53c8>
>>> new_page.add_feature('pagination','pageNum',1)
get_children()[source]

Returns a list of the children of this node

>>> new_page = document.create_node(type='page')
<kodexa.model.model.ContentNode object at 0x7f80605e53c8>
>>> new_page.get_children()
[]
get_content()[source]

Returns the content of the node

>>> new_page.get_content()
"This is page one"
get_feature(feature_type, name)[source]

Gets the value for the given feature.

You will need to provide the type and name of the feature. If no feature is find you will get None

>>> new_page.get_feature('pagination','pageNum')
1
get_feature_value(feature_type, name)[source]

Returns the assigned value for a given feature

You will need to provide the type and name of the feature. If the feature is present it will return the value otherwise it will return None.

>>> new_page.get_feature_value('pagination','pageNum')
1
get_features()[source]

Returns a list of the features on this content node

Returns:a list of the features present
get_features_of_type(feature_type)[source]

Return a list of all the features of a specific type

>>> new_page.get_features_of_type('tag')
[]
Returns:list of the tags
get_type()[source]

Returns the type of the node

>>> new_page.get_content()
"page"
has_feature(feature_type, name)[source]

Determines if the feature with the given name and type exists on this content node

You will need to provide the type and name of the feature. If the feature is present it will return True, else it will return False

>>> new_page.has_feature('pagination','pageNum')
True
Returns:True if the feature is present
remove_feature(feature_type, name)[source]

Determines if the feature with the given name and type exists on this content node

You will need to provide the type and name of the feature. If the feature is present it will return True, else it will return False

>>> new_page.remove_feature('pagination','pageNum')
set_feature(feature_type, name, value)[source]

Sets a feature to this ContentNode, replacing the value

You will need to provide the feature type, the name of the feature and then the value.

Note this will replace any matching feature (i.e. with the same type and name)

>>> new_page = document.create_node(type='page')
<kodexa.model.model.ContentNode object at 0x7f80605e53c8>
>>> new_page.add_feature('pagination','pageNum',1)
to_dict()[source]

Convert the ContentNode, and all its children into a simple dictionary

>>> new_page = document.create_node(type='page')
<kodexa.model.model.ContentNode object at 0x7f80605e53c8>
>>> current_content_node.to_dict()
to_json()[source]

Convert this node structure into a JSON object

>>> node.to_json()
to_text()[source]

Convert this node structure into a text representation, which can be useful when trying to review the structure.

>>> node.to_text()

Pipeline

A Pipeline is a way to bring together a Connector, set of steps and then a sink to perform data cleansing, normalization, analysis and more.

class Pipeline(connector, name='Default', stop_on_exception=True, logging_level=20)[source]

A pipeline represents a way to bring together parts of the kodexa framework to solve a specific problem.

When you create a Pipeline you must provide the connector that will be used to source the documents.

>>> pipeline = Pipeline(FolderConnector(path='/tmp/', file_filter='example.pdf'))
Parameters:
  • connector – the connector that will be the starting point for the pipeline
  • name – the name of the pipeline (default ‘Default’)
  • stop_on_exception – Should the pipeline raise exceptions and stop (default True)
  • logging_level – The logging level of the pipeline (default INFO)
add_step(step)[source]

Add the given step to the current pipeline

>>> pipeline = Pipeline(FolderConnector(path='/tmp/', file_filter='example.pdf'))
>>> pipeline.add_step(ExampleStep())

Note that it is also possible to add a function as a step, for example

>>> def my_function(doc):
>>>      doc.metadata.fishstick = 'foo'
>>>      return doc
>>> pipeline.add_step(my_function)
Parameters:step – the step to add
add_store(name, store)[source]

Add the store to the pipeline so that it is available to the pipeline

>>> pipeline = Pipeline(FolderConnector(path='/tmp/', file_filter='example.pdf'))
>>> pipeline.add_store("test-store", InMemoryObjectStore())
Parameters:
  • name – the name of the store (to refer to it)
  • store – the store that should be added
run()[source]

Run the current pipeline, note that you must have a sink in place to allow the pipeline to run

>>> pipeline = Pipeline(FolderConnector(path='/tmp/', file_filter='example.pdf'))
>>> pipeline.set_sink(ExampleSink())
>>> pipeline.run()
Returns:The context from the run
set_sink(sink)[source]

Set the sink you wish to use, note that it will replace any currently assigned sink

>>> pipeline = Pipeline(FolderConnector(path='/tmp/', file_filter='example.pdf'))
>>> pipeline.set_sink(ExampleSink())
Parameters:sink – the sink for the pipeline
class PipelineContext(content_provider=<kodexa.pipeline.pipeline.InMemoryContentProvider object>, store_provider=<kodexa.pipeline.pipeline.InMemoryStoreProvider object>, existing_content_objects=None, context=None)[source]

Pipeline context is created when you create a pipeline and it provides a way to access information about the pipeline that is running. It can be made available to steps/functions so they can interact with it.

It also provides access to the ‘stores’ that have been added to the pipeline

add_store(name, store)[source]

Add a store with given name to the context

Parameters:
  • name – the name to refer to the store with
  • store – the instance of the store
get_store(name, default=None)[source]

Get a store with given name from the context

Parameters:
  • name – the name to refer to the store with
  • default – optionally the default to create the store as if it isn’t there
Returns:

the store, or None is not available

get_store_names()[source]

Return the list of store names in context

Returns:the list of store names
set_output_document(output_document)[source]

Set the output document from the pipeline

Parameters:output_document – the final output document from the pipeline
Returns:the final output document
class PipelineStatistics[source]

A set of statistics for the processed document

documents_processed document_exceptions

processed_document(document)[source]

Update statistics based on this document completing processing

Parameters:document – the document that has been processed

Sinks

Sinks are the end-point of a Pipeline and allow for the final output of the pipeline to be either stored or written out

class InMemoryDocumentSink[source]

An in-memory document sink can be used for testing where you want to capture a set of the documents as basic list in-memory and then access them

get_document(index)[source]

Get document at given index

Parameters:index – index to get the document at
sink(document)[source]

Adds the document to the sink

Parameters:document – document to add

Stores

Stores are persistence components for Documents, typically they can act as either a Connector or a Sink

class JsonDocumentStore(store_path: str, force_initialize: bool = False)[source]

An implementation of a document store that uses JSON files to store the documents and maintains an index.json containing some basics of the documents

add(document: kodexa.model.model.Document)[source]

Add a new document and return the index position

Returns:The index of the document added
count()[source]

The number of documents in the store

Returns:The number of documents
delete(idx: int)[source]

Delete the document at the given index

Returns:The Document that was removed
get(idx: int)[source]

Load the document at the given index

Returns:Document at given index
get_document(index: int)[source]

Gets the document from the specific index

Parameters:index – index of document to get
Returns:the document
load(document_id: str)[source]

Loads the document with the given document ID

:return the document

read_index()[source]

Method to read the document index from the store path

reset_connector()[source]

Reset the index back to the beginning

save_index()[source]

Method to write the JSON store index back to store path

class TableDataStore(columns=None, rows=None)[source]

Stores data as a list of lists that can represent a table.

This is a good store when you are capturing nested or tabular data.

Parameters:
  • columns – a list of the column names (default to dynamic)
  • rows – initial set of rows (default to empty)
add(row)[source]

Writes a row to the Data Store

Parameters:row – the row (as a list) to add
count()[source]

Returns the number of rows in the store

Returns:number of rows
class DictDataStore(dicts=None)[source]

Stores data as a list of dictionaries that can be any structure

This is a good store when you are capturing nested or semi-structured data

add(dict)[source]

Writes a dict to the Data Store

Parameters:dict – the dict to add to the store
count()[source]

Returns the number of dictionaries in the store

Returns:number of dictionaries
class NodeTagger(type_re, content_re, tag_name, use_all_content=True, node_only=False)[source]

A node tagger allows you to provide a type and content regular expression and then tag content in all matching nodes.

It allows for multiple matching groups to be defined, also the ability to use all content and also just tag the node (ignoring the matching groups)

class TextParser(decode=False, encoding='utf-8')[source]

The text parser can load a source file as a text document and creates a single content node with the text

class Rollup(collapse_type_res=[], reindex=True)[source]

The rollup step allows you to decide how you want to collapse content in a document by removing nodes while maintaining content and features as needed

class ExtractTagsToKeyValuePair(store_name, include=[], exclude=[], include_node_content=True)[source]

Extract all the tags from a document into a key/value pair table store

class JsonParser[source]

Parse JSON file into kodexa Document

class ExtractTagsToKeyValuePair(store_name, include=[], exclude=[], include_node_content=True)[source]

Extract all the tags from a document into a key/value pair table store

class JsonParser[source]

Parse JSON file into kodexa Document

class NodeTagger(type_re, content_re, tag_name, use_all_content=True, node_only=False)[source]

A node tagger allows you to provide a type and content regular expression and then tag content in all matching nodes.

It allows for multiple matching groups to be defined, also the ability to use all content and also just tag the node (ignoring the matching groups)

class Rollup(collapse_type_res=[], reindex=True)[source]

The rollup step allows you to decide how you want to collapse content in a document by removing nodes while maintaining content and features as needed

class TextParser(decode=False, encoding='utf-8')[source]

The text parser can load a source file as a text document and creates a single content node with the text