Skip to main content
Open In ColabOpen on GitHub

How to create a custom Document Loader

Overviewโ€‹

Applications based on LLMs frequently entail extracting data from databases or files, like PDFs, and converting it into a format that LLMs can utilize. In LangChain, this usually involves creating Document objects, which encapsulate the extracted text (page_content) along with metadataโ€”a dictionary containing details about the document, such as the author's name or the date of publication.

Document objects are often formatted into prompts that are fed into an LLM, allowing the LLM to use the information in the Document to generate a desired response (e.g., summarizing the document). Documents can be either used immediately or indexed into a vectorstore for future retrieval and use.

The main abstractions for Document Loading are:

ComponentDescription
DocumentContains text and metadata
BaseLoaderUse to convert raw data into Documents
BlobA representation of binary data that's located either in a file or in memory
BaseBlobParserLogic to parse a Blob to yield Document objects

This guide will demonstrate how to write custom document loading and file parsing logic; specifically, we'll see how to:

  1. Create a standard document Loader by sub-classing from BaseLoader.
  2. Create a parser using BaseBlobParser and use it in conjunction with Blob and BlobLoaders. This is useful primarily when working with files.

Standard Document Loaderโ€‹

A document loader can be implemented by sub-classing from a BaseLoader which provides a standard interface for loading documents.

Interfaceโ€‹

Method NameExplanation
lazy_loadUsed to load documents one by one lazily. Use for production code.
alazy_loadAsync variant of lazy_load
loadUsed to load all the documents into memory eagerly. Use for prototyping or interactive work.
aloadUsed to load all the documents into memory eagerly. Use for prototyping or interactive work. Added in 2024-04 to LangChain.
  • The load methods is a convenience method meant solely for prototyping work -- it just invokes list(self.lazy_load()).
  • The alazy_load has a default implementation that will delegate to lazy_load. If you're using async, we recommend overriding the default implementation and providing a native async implementation.
important

When implementing a document loader do NOT provide parameters via the lazy_load or alazy_load methods.

All configuration is expected to be passed through the initializer (init). This was a design choice made by LangChain to make sure that once a document loader has been instantiated it has all the information needed to load documents.

Implementationโ€‹

Let's create an example of a standard document loader that loads a file and creates a document from each line in the file.

from typing import AsyncIterator, Iterator

from langchain_core.document_loaders import BaseLoader
from langchain_core.documents import Document


class CustomDocumentLoader(BaseLoader):
"""An example document loader that reads a file line by line."""

def __init__(self, file_path: str) -> None:
"""Initialize the loader with a file path.

Args:
file_path: The path to the file to load.
"""
self.file_path = file_path

def lazy_load(self) -> Iterator[Document]: # <-- Does not take any arguments
"""A lazy loader that reads a file line by line.

When you're implementing lazy load methods, you should use a generator
to yield documents one by one.
"""
with open(self.file_path, encoding="utf-8") as f:
line_number = 0
for line in f:
yield Document(
page_content=line,
metadata={"line_number": line_number, "source": self.file_path},
)
line_number += 1

# alazy_load is OPTIONAL.
# If you leave out the implementation, a default implementation which delegates to lazy_load will be used!
async def alazy_load(
self,
) -> AsyncIterator[Document]: # <-- Does not take any arguments
"""An async lazy loader that reads a file line by line."""
# Requires aiofiles
# Install with `pip install aiofiles`
# https://github.com/Tinche/aiofiles
import aiofiles

async with aiofiles.open(self.file_path, encoding="utf-8") as f:
line_number = 0
async for line in f:
yield Document(
page_content=line,
metadata={"line_number": line_number, "source": self.file_path},
)
line_number += 1
API Reference:BaseLoader | Document

Test ๐Ÿงชโ€‹

To test out the document loader, we need a file with some quality content.

with open("./meow.txt", "w", encoding="utf-8") as f:
quality_content = "meow meow๐Ÿฑ \n meow meow๐Ÿฑ \n meow๐Ÿ˜ป๐Ÿ˜ป"
f.write(quality_content)

loader = CustomDocumentLoader("./meow.txt")
%pip install -q aiofiles
## Test out the lazy load interface
for doc in loader.lazy_load():
print()
print(type(doc))
print(doc)

<class 'langchain_core.documents.base.Document'>
page_content='meow meow๐Ÿฑ
' metadata={'line_number': 0, 'source': './meow.txt'}

<class 'langchain_core.documents.base.Document'>
page_content=' meow meow๐Ÿฑ
' metadata={'line_number': 1, 'source': './meow.txt'}

<class 'langchain_core.documents.base.Document'>
page_content=' meow๐Ÿ˜ป๐Ÿ˜ป' metadata={'line_number': 2, 'source': './meow.txt'}
## Test out the async implementation
async for doc in loader.alazy_load():
print()
print(type(doc))
print(doc)

<class 'langchain_core.documents.base.Document'>
page_content='meow meow๐Ÿฑ
' metadata={'line_number': 0, 'source': './meow.txt'}

<class 'langchain_core.documents.base.Document'>
page_content=' meow meow๐Ÿฑ
' metadata={'line_number': 1, 'source': './meow.txt'}

<class 'langchain_core.documents.base.Document'>
page_content=' meow๐Ÿ˜ป๐Ÿ˜ป' metadata={'line_number': 2, 'source': './meow.txt'}
tip

load() can be helpful in an interactive environment such as a jupyter notebook.

Avoid using it for production code since eager loading assumes that all the content can fit into memory, which is not always the case, especially for enterprise data.

loader.load()
[Document(metadata={'line_number': 0, 'source': './meow.txt'}, page_content='meow meow๐Ÿฑ \n'),
Document(metadata={'line_number': 1, 'source': './meow.txt'}, page_content=' meow meow๐Ÿฑ \n'),
Document(metadata={'line_number': 2, 'source': './meow.txt'}, page_content=' meow๐Ÿ˜ป๐Ÿ˜ป')]

Working with Filesโ€‹

Many document loaders involve parsing files. The difference between such loaders usually stems from how the file is parsed, rather than how the file is loaded. For example, you can use open to read the binary content of either a PDF or a markdown file, but you need different parsing logic to convert that binary data into text.

As a result, it can be helpful to decouple the parsing logic from the loading logic, which makes it easier to re-use a given parser regardless of how the data was loaded.

BaseBlobParserโ€‹

A BaseBlobParser is an interface that accepts a blob and outputs a list of Document objects. A blob is a representation of data that lives either in memory or in a file. LangChain python has a Blob primitive which is inspired by the Blob WebAPI spec.

from langchain_core.document_loaders import BaseBlobParser, Blob


class MyParser(BaseBlobParser):
"""A simple parser that creates a document from each line."""

def lazy_parse(self, blob: Blob) -> Iterator[Document]:
"""Parse a blob into a document line by line."""
line_number = 0
with blob.as_bytes_io() as f:
for line in f:
line_number += 1
yield Document(
page_content=line,
metadata={"line_number": line_number, "source": blob.source},
)
API Reference:BaseBlobParser | Blob
blob = Blob.from_path("./meow.txt")
parser = MyParser()
list(parser.lazy_parse(blob))
[Document(metadata={'line_number': 1, 'source': './meow.txt'}, page_content='meow meow๐Ÿฑ \n'),
Document(metadata={'line_number': 2, 'source': './meow.txt'}, page_content=' meow meow๐Ÿฑ \n'),
Document(metadata={'line_number': 3, 'source': './meow.txt'}, page_content=' meow๐Ÿ˜ป๐Ÿ˜ป')]

Using the blob API also allows one to load content directly from memory without having to read it from a file!

blob = Blob(data=b"some data from memory\nmeow")
list(parser.lazy_parse(blob))
[Document(metadata={'line_number': 1, 'source': None}, page_content='some data from memory\n'),
Document(metadata={'line_number': 2, 'source': None}, page_content='meow')]

Blobโ€‹

Let's take a quick look through some of the Blob API.

blob = Blob.from_path("./meow.txt", metadata={"foo": "bar"})
blob.encoding
'utf-8'
blob.as_bytes()
b'meow meow\xf0\x9f\x90\xb1 \n meow meow\xf0\x9f\x90\xb1 \n meow\xf0\x9f\x98\xbb\xf0\x9f\x98\xbb'
blob.as_string()
'meow meow๐Ÿฑ \n meow meow๐Ÿฑ \n meow๐Ÿ˜ป๐Ÿ˜ป'
blob.as_bytes_io()
<contextlib._GeneratorContextManager at 0x7f89cf9336d0>
blob.metadata
{'foo': 'bar'}
blob.source
'./meow.txt'

Blob Loadersโ€‹

While a parser encapsulates the logic needed to parse binary data into documents, blob loaders encapsulate the logic that's necessary to load blobs from a given storage location.

A the moment, LangChain supports FileSystemBlobLoader and CloudBlobLoader.

You can use the FileSystemBlobLoader to load blobs and then use the parser to parse them.

from langchain_community.document_loaders.blob_loaders import FileSystemBlobLoader

filesystem_blob_loader = FileSystemBlobLoader(
path=".", glob="*.mdx", show_progress=True
)
API Reference:FileSystemBlobLoader
%pip install -q tqdm
parser = MyParser()
for blob in filesystem_blob_loader.yield_blobs():
for doc in parser.lazy_parse(blob):
print(doc)
break
0it [00:00, ?it/s]

Or, you can use CloudBlobLoader to load blobs from a cloud storage location (Supports s3://, az://, gs://, file:// schemes).

%pip install -q cloudpathlib[s3]
Note: you may need to restart the kernel to use updated packages.
from langchain_community.document_loaders.blob_loaders import CloudBlobLoader

cloud_blob_loader = CloudBlobLoader(
url="s3://mybucket", # Supports s3://, az://, gs://, file:// schemes.
glob="*.pdf",
show_progress=True,
)
for blob in cloud_blob_loader.yield_blobs():
print(blob)
API Reference:CloudBlobLoader
---------------------------------------------------------------------------
``````output
ClientError Traceback (most recent call last)
``````output
Cell In[3], line 8
1 from langchain_community.document_loaders.blob_loaders import CloudBlobLoader
3 cloud_blob_loader = CloudBlobLoader(
4 url="s3://mybucket", # Supports s3://, az://, gs://, file:// schemes.
5 glob="*.pdf",
6 show_progress=True
7 )
----> 8 for blob in cloud_blob_loader.yield_blobs():
9 print(blob)
``````output
File ~/workspace.bda/patch_langchain_common/.venv/lib/python3.12/site-packages/langchain_community/document_loaders/blob_loaders/cloud_blob_loader.py:217, in CloudBlobLoader.yield_blobs(self)
212 """Yield blobs that match the requested pattern."""
213 iterator = _make_iterator(
214 length_func=self.count_matching_files, show_progress=self.show_progress
215 )
--> 217 for path in iterator(self._yield_paths()):
218 # yield Blob.from_path(path)
219 yield self.from_path(path)
``````output
File ~/workspace.bda/patch_langchain_common/.venv/lib/python3.12/site-packages/langchain_community/document_loaders/blob_loaders/cloud_blob_loader.py:115, in _make_iterator.<locals>._with_tqdm(iterable)
113 def _with_tqdm(iterable: Iterable[T]) -> Iterator[T]:
114 """Wrap an iterable in a tqdm progress bar."""
--> 115 return tqdm(iterable, total=length_func())
``````output
File ~/workspace.bda/patch_langchain_common/.venv/lib/python3.12/site-packages/langchain_community/document_loaders/blob_loaders/cloud_blob_loader.py:242, in CloudBlobLoader.count_matching_files(self)
239 # Carry out a full iteration to count the files without
240 # materializing anything expensive in memory.
241 num = 0
--> 242 for _ in self._yield_paths():
243 num += 1
244 return num
``````output
File ~/workspace.bda/patch_langchain_common/.venv/lib/python3.12/site-packages/langchain_community/document_loaders/blob_loaders/cloud_blob_loader.py:228, in CloudBlobLoader._yield_paths(self)
225 return
227 paths = self.path.glob(self.glob) # type: ignore[attr-defined]
--> 228 for path in paths:
229 if self.exclude:
230 if any(path.match(glob) for glob in self.exclude):
``````output
File ~/workspace.bda/patch_langchain_common/.venv/lib/python3.12/site-packages/cloudpathlib/cloudpath.py:523, in CloudPath.glob(self, pattern, case_sensitive)
518 pattern_parts = PurePosixPath(pattern).parts
519 selector = _make_selector(
520 tuple(pattern_parts), _posix_flavour, case_sensitive=case_sensitive
521 )
--> 523 yield from self._glob(
524 selector,
525 "/" in pattern
526 or "**"
527 in pattern, # recursive listing needed if explicit ** or any sub folder in pattern
528 )
``````output
File ~/workspace.bda/patch_langchain_common/.venv/lib/python3.12/site-packages/cloudpathlib/cloudpath.py:501, in CloudPath._glob(self, selector, recursive)
500 def _glob(self, selector, recursive: bool) -> Generator[Self, None, None]:
--> 501 file_tree = self._build_subtree(recursive)
503 root = _CloudPathSelectable(
504 self.name,
505 [], # nothing above self will be returned, so initial parents is empty
506 file_tree,
507 )
509 for p in selector.select_from(root):
510 # select_from returns self.name/... so strip before joining
``````output
File ~/workspace.bda/patch_langchain_common/.venv/lib/python3.12/site-packages/cloudpathlib/cloudpath.py:488, in CloudPath._build_subtree(self, recursive)
484 _build_tree(trunk[branch], next_branch, nodes, is_dir)
486 file_tree = Tree()
--> 488 for f, is_dir in self.client._list_dir(self, recursive=recursive):
489 parts = str(f.relative_to(self)).split("/")
491 # skip self
``````output
File ~/workspace.bda/patch_langchain_common/.venv/lib/python3.12/site-packages/cloudpathlib/s3/s3client.py:233, in S3Client._list_dir(self, cloud_path, recursive)
229 yielded_dirs = set()
231 paginator = self.client.get_paginator("list_objects_v2")
--> 233 for result in paginator.paginate(
234 Bucket=cloud_path.bucket,
235 Prefix=prefix,
236 Delimiter=("" if recursive else "/"),
237 **self.boto3_list_extra_args,
238 ):
239 # yield everything in common prefixes as directories
240 for result_prefix in result.get("CommonPrefixes", []):
241 canonical = result_prefix.get("Prefix").rstrip("/") # keep a canonical form
``````output
File ~/workspace.bda/patch_langchain_common/.venv/lib/python3.12/site-packages/botocore/paginate.py:269, in PageIterator.__iter__(self)
267 self._inject_starting_params(current_kwargs)
268 while True:
--> 269 response = self._make_request(current_kwargs)
270 parsed = self._extract_parsed_response(response)
271 if first_request:
272 # The first request is handled differently. We could
273 # possibly have a resume/starting token that tells us where
274 # to index into the retrieved page.
``````output
File ~/workspace.bda/patch_langchain_common/.venv/lib/python3.12/site-packages/botocore/paginate.py:357, in PageIterator._make_request(self, current_kwargs)
356 def _make_request(self, current_kwargs):
--> 357 return self._method(**current_kwargs)
``````output
File ~/workspace.bda/patch_langchain_common/.venv/lib/python3.12/site-packages/botocore/client.py:569, in ClientCreator._create_api_method.<locals>._api_call(self, *args, **kwargs)
565 raise TypeError(
566 f"{py_operation_name}() only accepts keyword arguments."
567 )
568 # The "self" in this scope is referring to the BaseClient.
--> 569 return self._make_api_call(operation_name, kwargs)
``````output
File ~/workspace.bda/patch_langchain_common/.venv/lib/python3.12/site-packages/botocore/client.py:1023, in BaseClient._make_api_call(self, operation_name, api_params)
1019 error_code = error_info.get("QueryErrorCode") or error_info.get(
1020 "Code"
1021 )
1022 error_class = self.exceptions.from_code(error_code)
-> 1023 raise error_class(parsed_response, operation_name)
1024 else:
1025 return parsed_response
``````output
ClientError: An error occurred (ExpiredToken) when calling the ListObjectsV2 operation: The provided token has expired.

Generic Loaderโ€‹

LangChain has a GenericLoader abstraction which composes a BlobLoader with a BaseBlobParser.

GenericLoader is meant to provide standardized classmethods that make it easy to use existing BlobLoader implementations. At the moment, the FileSystemBlobLoader and CloudBlobLoader are supported.

from langchain_community.document_loaders.generic import GenericLoader

generic_loader_filesystem = GenericLoader(
blob_loader=filesystem_blob_loader, blob_parser=parser
)
for idx, doc in enumerate(loader.lazy_load()):
if idx < 5:
print(doc)

print("... output truncated for demo purposes")
API Reference:GenericLoader
page_content='meow meow๐Ÿฑ 
' metadata={'line_number': 0, 'source': './meow.txt'}
page_content=' meow meow๐Ÿฑ
' metadata={'line_number': 1, 'source': './meow.txt'}
page_content=' meow๐Ÿ˜ป๐Ÿ˜ป' metadata={'line_number': 2, 'source': './meow.txt'}
... output truncated for demo purposes
from langchain_community.document_loaders.generic import GenericLoader

generic_loader_cloud = GenericLoader(blob_loader=cloud_blob_loader, blob_parser=parser)
for idx, doc in enumerate(loader.lazy_load()):
if idx < 5:
print(doc)

print("... output truncated for demo purposes")
API Reference:GenericLoader

Custom Generic Loaderโ€‹

If you really like creating classes, you can sub-class and create a class to encapsulate the logic together.

You can sub-class from this class to load content using an existing loader.

from typing import Any


class MyCustomLoader(GenericLoader):
@staticmethod
def get_parser(**kwargs: Any) -> BaseBlobParser:
"""Override this method to associate a default parser with the class."""
return MyParser()
loader = MyCustomLoader.from_filesystem(path=".", glob="*.mdx", show_progress=True)

for idx, doc in enumerate(loader.lazy_load()):
if idx < 5:
print(doc)

print("... output truncated for demo purposes")
0it [00:00, ?it/s]
... output truncated for demo purposes

Was this page helpful?