Index
Documentation
fns
is a collection of python functions re-usable across ML projects.
Installation
pip install fns
Prototyping helpers
- Import most common python stdlib and data science functions into current session.
from fns.all import *
Functions
array_except_element(arr, elem)
Get copy of array without an element.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
arr |
List |
required | |
elem |
Any |
required |
Returns:
Type | Description |
---|---|
List |
Array |
Examples:
>>> array_except_element([1, 2, 3], 3)
[1, 2]
Source code in fns/fns.py
def array_except_element(arr: List, elem: Any) -> List:
"""
Get copy of array without an element.
Args:
arr:
elem:
Returns:
Array
Example:
```python
>>> array_except_element([1, 2, 3], 3)
[1, 2]
```
"""
elem_index = arr.index(elem)
return arr[:elem_index] + arr[elem_index + 1 :]
base64_dict(base64_str)
Parse a base64-encoded JSON as dictionary.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
base64_str |
str |
Base-64 encoded string representation of JSON |
required |
Returns:
Type | Description |
---|---|
Dict |
Python Dictionary |
Source code in fns/fns.py
def base64_dict(base64_str: str) -> Dict:
"""
Parse a base64-encoded JSON as dictionary.
Args:
base64_str: Base-64 encoded string representation of JSON
Returns:
Python Dictionary
"""
return json.loads(base64.b64decode(base64_str))
flatten(x)
Flatten a list of list.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
x |
List[List] |
List of list of elements |
required |
Returns:
Type | Description |
---|---|
Iterator |
Iterator of flattened array. |
Source code in fns/fns.py
def flatten(x: List[List]) -> Iterator:
"""
Flatten a list of list.
Args:
x: List of list of elements
Returns:
Iterator of flattened array.
"""
return itertools.chain.from_iterable(x)
format_as_hms(seconds)
Convert seconds to HH:MM:SS format.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
seconds |
Union[int, float] |
Number of seconds |
required |
Returns:
Type | Description |
---|---|
str |
String in the format HH:MM:SS |
Source code in fns/fns.py
def format_as_hms(seconds: Union[int, float]) -> str:
"""
Convert seconds to HH:MM:SS format.
Args:
seconds: Number of seconds
Returns:
String in the format HH:MM:SS
"""
return time.strftime("%H:%M:%S", time.gmtime(seconds))
generate_edits(word, n=1)
Generate variations that are n
edits away from word.
Adapted from: https://norvig.com/spell-correct.html
Parameters:
Name | Type | Description | Default |
---|---|---|---|
word |
str |
Single word |
required |
n |
int |
Number of edits away from word. |
1 |
Returns:
Type | Description |
---|---|
List[str] |
List of edits |
Source code in fns/fns.py
def generate_edits(word: str, n: int = 1) -> List[str]:
"""
Generate variations that are `n` edits away from word.
Adapted from: https://norvig.com/spell-correct.html
Args:
word: Single word
n: Number of edits away from word.
Returns:
List of edits
"""
def edits1(word: str):
letters = "abcdefghijklmnopqrstuvwxyz"
splits = [(word[:i], word[i:]) for i in range(len(word) + 1)]
deletes = [L + R[1:] for L, R in splits if R]
transposes = [L + R[1] + R[0] + R[2:] for L, R in splits if len(R) > 1]
replaces = [L + c + R[1:] for L, R in splits if R for c in letters]
inserts = [L + c + R for L, R in splits for c in letters]
return set(deletes + transposes + replaces + inserts)
edits = edits1(word)
for i in range(n - 1):
edits = [e2 for e1 in edits for e2 in edits1(e1)]
return edits
harmonic_mean(a, b)
Compute harmonic mean of two numbers.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
a |
Union[int, float] |
First number |
required |
b |
Union[int, float] |
Second number |
required |
Returns:
Type | Description |
---|---|
Union[int, float] |
Harmonic mean |
Source code in fns/fns.py
def harmonic_mean(a: Union[int, float], b: Union[int, float]) -> Union[int, float]:
"""
Compute harmonic mean of two numbers.
Args:
a: First number
b: Second number
Returns:
Harmonic mean
"""
return (2 * a * b) / (a + b)
hash_file(file_object)
Calculate MD5 hash of file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_object |
IO |
File object |
required |
Returns:
Type | Description |
---|---|
|
MD5 hash of the file |
Source code in fns/fns.py
def hash_file(file_object: IO):
"""
Calculate MD5 hash of file.
Args:
file_object: File object
Returns:
MD5 hash of the file
"""
# Calculate hash
unique_hash = md5_hash(file_object.read())
# Reset file pointer to start
file_object.seek(0)
return unique_hash
minibatch(items, size)
Create mini-batches of length 'size' from a list of items.
Original Source: spacy
package
Original function definition: https://github.com/explosion/spaCy/blob/master/spacy/util.py#L1426
Source code in fns/fns.py
def minibatch(items, size):
"""
Create mini-batches of length 'size' from a list of items.
Original Source: `spacy` package
Original function definition:
https://github.com/explosion/spaCy/blob/master/spacy/util.py#L1426
"""
if isinstance(size, int):
size_ = itertools.repeat(size)
else:
size_ = size
items = iter(items)
while True:
batch_size = next(size_)
batch = list(itertools.islice(items, int(batch_size)))
if len(batch) == 0:
break
yield list(batch)
ngrams(tokens, n)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tokens |
List |
List of elements |
required |
n |
int |
N-gram size |
required |
Returns:
Type | Description |
---|---|
|
List of ngrams |
Source code in fns/fns.py
def ngrams(tokens: List, n: int):
"""
Args:
tokens: List of elements
n: N-gram size
Returns:
List of ngrams
"""
return [tokens[i : i + n] for i in range(len(tokens) - n + 1)]
num_files(path)
Get the number of files in a path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[pathlib.Path, str] |
File path |
required |
Returns:
Type | Description |
---|---|
int |
Number of files |
Source code in fns/fns.py
def num_files(path: Union[Path, str]) -> int:
"""
Get the number of files in a path.
Args:
path: File path
Returns:
Number of files
"""
return len(os.listdir(path))
parse_manual(parser, command)
Use argument parser in notebooks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parser |
ArgumentParser |
ArgumentParser |
required |
command |
str |
Command line arguments as string |
required |
Returns:
Type | Description |
---|---|
Namespace |
Parsed argument as namespace |
Source code in fns/fns.py
def parse_manual(parser: argparse.ArgumentParser, command: str) -> argparse.Namespace:
"""
Use argument parser in notebooks.
Args:
parser: ArgumentParser
command: Command line arguments as string
Returns:
Parsed argument as namespace
"""
args = command.split()
return parser.parse_args(args=args)
percent_dict(d)
Convert a dictionary of key-value to key:coverage-percent.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
d |
Dict |
Dictionary of key and values |
required |
Returns:
Type | Description |
---|---|
Dict |
Dictionary of key and percent-coverage |
Source code in fns/fns.py
def percent_dict(d: Dict) -> Dict:
"""
Convert a dictionary of key-value to key:coverage-percent.
Args:
d: Dictionary of key and values
Returns:
Dictionary of key and percent-coverage
"""
total = sum(d.values())
return {key: value / total * 100.0 for key, value in d.items()}
print_json(d)
Render python dictionary as JSON with double quotes and indentation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
d |
Dict |
Python dictionary |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in fns/fns.py
def print_json(d: Dict) -> None:
"""
Render python dictionary as JSON with double quotes and indentation.
Args:
d: Python dictionary
Returns:
None
"""
print(json.dumps(d, indent=4))
read_as_base64(path)
Convert file contents into a base64 string
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, pathlib.Path] |
File path |
required |
Returns:
Type | Description |
---|---|
str |
Base64 string |
Source code in fns/fns.py
def read_as_base64(path: Union[str, Path]) -> str:
"""
Convert file contents into a base64 string
Args:
path: File path
Returns:
Base64 string
"""
content = Path(path).read_text()
return base64.b64encode(content.encode("utf-8")).decode("utf-8")
read_json(json_path)
Read json file from a path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
json_path |
Union[str, pathlib.Path] |
File path to a json file. |
required |
Returns:
Type | Description |
---|---|
Dict |
Python dictionary |
Source code in fns/fns.py
def read_json(json_path: Union[str, Path]) -> Dict:
"""
Read json file from a path.
Args:
json_path: File path to a json file.
Returns:
Python dictionary
"""
with open(json_path, "r") as fp:
data = json.load(fp)
return data
read_pickle(path)
Read a pickle file from path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, pathlib.Path] |
File path |
required |
Returns:
Type | Description |
---|---|
Any |
Unpickled object |
Source code in fns/fns.py
def read_pickle(path: Union[str, Path]) -> Any:
"""
Read a pickle file from path.
Args:
path: File path
Returns:
Unpickled object
"""
with open(path, "rb") as fp:
return pickle.load(fp)
reverse_mapping(d)
Swap mapping from key: value to value: key
Parameters:
Name | Type | Description | Default |
---|---|---|---|
d |
Dict |
Python Dictionary |
required |
Returns:
Type | Description |
---|---|
Dict |
Dictionary with key and value swapped |
Source code in fns/fns.py
def reverse_mapping(d: Dict) -> Dict:
"""
Swap mapping from key: value to value: key
Args:
d: Python Dictionary
Returns:
Dictionary with key and value swapped
"""
return {v: k for k, v in d.items()}
roundup(n, m=10)
Round up a number n to the nearest multiple of M.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n |
float |
Number |
required |
m |
int |
Multiple of which number to roundup to |
10 |
Returns:
Type | Description |
---|---|
int |
Rounded integer number |
Source code in fns/fns.py
def roundup(n: float, m: int = 10) -> int:
"""
Round up a number n to the nearest multiple of M.
Args:
n: Number
m: Multiple of which number to roundup to
Returns:
Rounded integer number
"""
return int(math.ceil(n / m)) * m
sort_dict_by_value(d, reverse=False)
Sort items in dictionary by value.
Examples:
>>> sort_dict_by_value({'gold': 40, 'silver': 25})
{'silver': 25, 'gold': 40}
Parameters:
Name | Type | Description | Default |
---|---|---|---|
d |
Dict |
Python Dictionary |
required |
reverse |
bool |
Sort order |
False |
Returns:
Type | Description |
---|---|
Dict |
Sorted dictionary |
Source code in fns/fns.py
def sort_dict_by_value(d: Dict, reverse: bool = False) -> Dict:
"""
Sort items in dictionary by value.
Example:
```python
>>> sort_dict_by_value({'gold': 40, 'silver': 25})
{'silver': 25, 'gold': 40}
```
Args:
d: Python Dictionary
reverse: Sort order
Returns:
Sorted dictionary
"""
return dict(sorted(d.items(), key=lambda item: item[1], reverse=reverse))
top(data, n=5)
Get a dictionary of top-n items from a list.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
|
Python collection |
required |
n |
int |
Number of top-values |
5 |
Returns:
Type | Description |
---|---|
Dict |
Dictionary of top-n items and count |
Source code in fns/fns.py
def top(data, n: int = 5) -> Dict:
"""
Get a dictionary of top-n items from a list.
Args:
data: Python collection
n: Number of top-values
Returns:
Dictionary of top-n items and count
"""
return dict(Counter(data).most_common(n))
top_n_from_dict(dictionary, n=10)
Get top n largest values from the dictionary.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dictionary |
Dict |
Python dictionary |
required |
n |
int |
Number of keys to pick |
10 |
Source code in fns/fns.py
def top_n_from_dict(dictionary: Dict, n: int = 10):
"""
Get top n largest values from the dictionary.
Args:
dictionary: Python dictionary
n: Number of keys to pick
Returns:
"""
return top(dictionary, n=n)
write_json(item, path, mode='w')
Save json to a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item |
Dict |
Python dictionary |
required |
path |
Union[pathlib.Path, str] |
File path to save at |
required |
mode |
str |
File write mode |
'w' |
Returns:
Type | Description |
---|---|
None |
None |
Source code in fns/fns.py
def write_json(item: Dict, path: Union[Path, str], mode: str = "w") -> None:
"""
Save json to a file.
Args:
item: Python dictionary
path: File path to save at
mode: File write mode
Returns:
None
"""
with open(path, mode=mode) as fp:
json.dump(item, fp)
write_pickle(item, path)
Pickle a python object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item |
Any |
Python object |
required |
path |
Union[pathlib.Path, str] |
File path to save the pickle file |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in fns/fns.py
def write_pickle(item: Any, path: Union[Path, str]) -> None:
"""
Pickle a python object.
Args:
item: Python object
path: File path to save the pickle file
Returns:
None
"""
with open(path, "wb") as fp:
pickle.dump(item, fp)
Functions
cluster_text(texts, return_dataframe=True, n=None)
Quickly cluster a list of sentences for EDA.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n |
int |
Number of clusters |
None |
texts |
List[str] |
List of sentences |
required |
return_dataframe |
bool |
Whether to return as dataframe or a list of cluster labels |
True |
Returns:
Type | Description |
---|---|
Union[pandas.core.frame.DataFrame, List[int]] |
Source code in fns/cluster.py
def cluster_text(
texts: List[str], return_dataframe: bool = True, n: int = None
) -> Union[pd.DataFrame, List[int]]:
"""
Quickly cluster a list of sentences for EDA.
Args:
n: Number of clusters
texts: List of sentences
return_dataframe: Whether to return as dataframe or a list of cluster labels
Returns:
"""
n = n or n_clusters(texts)
hybrid_tfidf = FeatureUnion(
[
(
"word_tfidf",
TfidfVectorizer(
ngram_range=(1, 2),
strip_accents="unicode",
analyzer="word",
stop_words="english",
sublinear_tf=True,
),
),
(
"char_tfidf",
TfidfVectorizer(
ngram_range=(3, 3),
strip_accents="unicode",
analyzer="char_wb",
stop_words="english",
sublinear_tf=True,
),
),
]
)
cluster_pipeline = make_pipeline(
hybrid_tfidf,
FunctionTransformer(lambda x: x.todense(), accept_sparse=True, validate=False),
PCA(0.9, random_state=0),
KMeans(n, random_state=0),
)
clusters = cluster_pipeline.fit_predict(texts)
if return_dataframe:
return (
pd.DataFrame({"text": texts, "cluster": clusters})
.assign(
cluster_size=lambda d: d["cluster"].map(d["cluster"].value_counts())
)
.sort_values(
by=["cluster_size", "cluster", "text"], ascending=[False, True, True]
)
.drop(columns=["cluster_size"])
)
else:
return clusters
similarity_sort(texts)
Sort list of sentences such that similar sentence are placed consecutively.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
texts |
List[str] |
List of sentences |
required |
Returns:
Type | Description |
---|---|
List[str] |
Sorted sentences |
Source code in fns/cluster.py
def similarity_sort(texts: List[str]) -> List[str]:
"""
Sort list of sentences such that similar sentence are placed consecutively.
Args:
texts: List of sentences
Returns:
Sorted sentences
"""
df = cluster_text(texts, n=len(texts) // 2)
return df["text"].tolist()
Functions
expose_port(port, path='/')
Expose port as an external URL.
The URL is only accessible to you and available till the notebook runs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
port |
int |
Port a service is running on |
required |
path |
str |
Path the service is running on |
'/' |
Returns:
Type | Description |
---|---|
None |
None |
Source code in fns/colab.py
def expose_port(port: int, path: str = "/") -> None:
"""
Expose port as an external URL.
The URL is only accessible to you and available till the notebook runs.
Args:
port: Port a service is running on
path: Path the service is running on
Returns:
None
"""
output = import_module("google.colab.output")
output.serve_kernel_port_as_window(port, path=path)
jupyter(subdomain, port=9003)
Start a jupyter notebook server using localtunnel.
Returns:
Type | Description |
---|---|
None |
None |
Source code in fns/colab.py
def jupyter(subdomain: str, port: int = 9003) -> None:
"""
Start a jupyter notebook server using localtunnel.
Returns:
None
"""
command = f"jupyter-notebook --ip='*' --no-browser --allow-root --port 9003 & npx localtunnel -p {port} -s {subdomain} --allow-invalid-cert"
run_foreground(command)
run_background(command)
Run a bash command in background.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
command |
str |
Bash command |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in fns/colab.py
def run_background(command: str) -> None:
"""
Run a bash command in background.
Args:
command: Bash command
Returns:
None
"""
subprocess.Popen(command, shell=True)
run_foreground(cmd)
Run a bash command in foreground.
Reference: http://blog.kagesenshi.org/2008/02/teeing-python-subprocesspopen-output.html
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cmd |
str |
Bash command |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in fns/colab.py
def run_foreground(cmd: str) -> None:
"""
Run a bash command in foreground.
Reference: http://blog.kagesenshi.org/2008/02/teeing-python-subprocesspopen-output.html
Args:
cmd: Bash command
Returns:
None
"""
p = subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
while True:
line = p.stdout.readline()
print(line.strip())
if line == "" and p.poll() is not None:
break
return None
vscode(subdomain='amitness', port=9000, config_save_path='/content/drive/MyDrive/colab/.vscode')
Start VSCode server which persists all settings and extensions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
subdomain |
str |
Subdomain for localtunnel. |
'amitness' |
port |
int |
Port for running code-server |
9000 |
config_save_path |
str |
Path in Google Drive to save VSCode settings |
'/content/drive/MyDrive/colab/.vscode' |
Returns:
Type | Description |
---|---|
None |
None |
Source code in fns/colab.py
def vscode(
subdomain: str = "amitness",
port: int = 9000,
config_save_path: str = "/content/drive/MyDrive/colab/.vscode",
) -> None:
"""
Start VSCode server which persists all settings and extensions.
Args:
subdomain: Subdomain for localtunnel.
port: Port for running code-server
config_save_path: Path in Google Drive to save VSCode settings
Returns:
None
"""
drive = import_module("google.colab.drive")
drive.mount("/content/drive")
subprocess.run(["curl", "-fsSL", "https://code-server.dev/install.sh", "-O"])
subprocess.run(["bash", "install.sh", "--version", "3.10.2"])
subprocess.run(["pip3", "install", "flake8", "--user"])
subprocess.run(["pip3", "install", "black", "--user"])
print(f"https://{subdomain}.loca.lt/?folder=/content/drive/MyDrive/colab")
run_foreground(
f"code-server --port {port} --auth none --disable-telemetry --force --user-data-dir {config_save_path} & npx localtunnel -p {port} -s {subdomain} --allow-invalid-cert"
)
Functions
display_all()
Show all the rows and columns when printing dataframe.
Returns:
Type | Description |
---|---|
None |
None |
Source code in fns/dataframe.py
def display_all() -> None:
"""
Show all the rows and columns when printing dataframe.
Returns:
None
"""
import pandas as pd
pd.set_option("display.max_colwidth", None)
pd.set_option("display.max_rows", None)
explore_df(df)
Perform a quick peek of a dataframe.
Currently shows: - Number of null elements in each column - Data type of each column - One example data for each column
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame |
Pandas DataFrame |
required |
Returns:
Type | Description |
---|---|
DataFrame |
DataFrame with summary infos |
Source code in fns/dataframe.py
def explore_df(df: pd.DataFrame) -> pd.DataFrame:
"""
Perform a quick peek of a dataframe.
Currently shows:
- Number of null elements in each column
- Data type of each column
- One example data for each column
Args:
df: Pandas DataFrame
Returns:
DataFrame with summary infos
"""
null_df = pd.DataFrame(df.isnull().sum(), columns=["num_nulls"])
dtype_df = pd.DataFrame(df.dtypes, columns=["dtype"])
return df.T.sample(1, axis=1).join([dtype_df, null_df]).rename_axis("Columns")
fake_df()
Generate a dataframe filled with random data.
Returns:
Type | Description |
---|---|
DataFrame |
Pandas DataFrame |
Source code in fns/dataframe.py
def fake_df() -> pd.DataFrame:
"""
Generate a dataframe filled with random data.
Returns:
Pandas DataFrame
"""
return pd.util.testing.makeDataFrame()
is_outlier(values)
Generate a mask if an element is an outlier or not.
Extra:
Condition 1: < Q1 - 1.5 * IQR
Condition 2: > Q3 + 1.5 * IQR
Parameters:
Name | Type | Description | Default |
---|---|---|---|
values |
List |
List of numerical values |
required |
Returns:
Type | Description |
---|---|
List[bool] |
List of boolean indicating if an element is outlier or not |
Source code in fns/dataframe.py
def is_outlier(values: List) -> List[bool]:
"""
Generate a mask if an element is an outlier or not.
Extra:
```
Condition 1: < Q1 - 1.5 * IQR
Condition 2: > Q3 + 1.5 * IQR
```
Args:
values: List of numerical values
Returns:
List of boolean indicating if an element is outlier or not
"""
q1 = np.quantile(values, 0.25)
q3 = np.quantile(values, 0.75)
iqr = q3 - q1
lower_threshold = q1 - 1.5 * iqr
upper_threshold = q3 + 1.5 * iqr
return (values < lower_threshold) | (values > upper_threshold)
no_wrapping()
Return a context manager to display all rows and columns.
Examples:
with no_wrapping():
print(df)
Returns:
Type | Description |
---|---|
|
Context Manager |
Source code in fns/dataframe.py
def no_wrapping():
"""
Return a context manager to display all rows and columns.
Examples:
```python
with no_wrapping():
print(df)
```
Returns:
Context Manager
"""
return pd.option_context("display.max_rows", None, "display.max_columns", None)
print_groups(df, column)
Pretty print all subsets of a groupby.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame |
Pandas DataFrame |
required |
column |
str |
Column Name to group by |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in fns/dataframe.py
def print_groups(df: pd.DataFrame, column: str) -> None:
"""
Pretty print all subsets of a groupby.
Args:
df: Pandas DataFrame
column: Column Name to group by
Returns:
None
"""
for current_group, sub_df in df.groupby(column):
print(f"Group: {current_group}")
print()
# Skip group column
mask = ~(sub_df.columns.isin([column]))
print(sub_df.loc[:, mask])
print()
print("---" * 25)
read_dict(data)
Create a dataframe from dictionary with unequal elements.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Dict |
Dictionary with column names as keys and rows as values |
required |
Returns:
Type | Description |
---|---|
DataFrame |
Pandas DataFrame |
Source code in fns/dataframe.py
def read_dict(data: Dict) -> pd.DataFrame:
"""
Create a dataframe from dictionary with unequal elements.
Args:
data: Dictionary with column names as keys and rows as values
Returns:
Pandas DataFrame
"""
return pd.DataFrame.from_dict(data, orient="index").transpose()
to_excel(path, df, sheet_name, index=False, mode='a')
Add a dataframe to an existing Excel file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[pathlib.Path, str] |
Path of the excel file |
required |
df |
DataFrame |
Pandas DataFrame |
required |
sheet_name |
str |
The sheet name to save in |
required |
index |
bool |
Keep or remove index |
False |
mode |
str |
'a' for append or 'w' for write |
'a' |
Returns:
Type | Description |
---|---|
None |
None |
Source code in fns/dataframe.py
def to_excel(
path: Union[Path, str],
df: pd.DataFrame,
sheet_name: str,
index: bool = False,
mode: str = "a",
) -> None:
"""
Add a dataframe to an existing Excel file.
Args:
path: Path of the excel file
df: Pandas DataFrame
sheet_name: The sheet name to save in
index: Keep or remove index
mode: 'a' for append or 'w' for write
Returns:
None
"""
with pd.ExcelWriter(path, mode=mode) as writer:
df.to_excel(writer, sheet_name=sheet_name, index=index)
Functions
batched(batch_size=32)
Apply a function over small batches of a list and combine results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch_size |
int |
Size of each mini-batch |
32 |
Returns:
Type | Description |
---|---|
Callable |
Decorator for the batch size |
Source code in fns/decorators.py
def batched(batch_size: int = 32) -> Callable:
"""
Apply a function over small batches of a list and combine results.
Args:
batch_size: Size of each mini-batch
Returns:
Decorator for the batch size
"""
def decorator(func) -> Callable:
@functools.wraps(func)
def inner(*args, **kwargs):
items = args[0]
results = []
for batch in minibatch(items, batch_size):
batch_results = func(batch)
results.extend(batch_results)
return results
return inner
return decorator
deduplicate(func)
Decorator to deduplicate results of a function.
Usage:
@deduplicate
def test():
return [1, 2, 3, 1]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
func |
Callable |
Function |
required |
Returns:
Type | Description |
---|---|
Callable |
Function |
Source code in fns/decorators.py
def deduplicate(func: Callable) -> Callable:
"""
Decorator to deduplicate results of a function.
Usage:
```python
@deduplicate
def test():
return [1, 2, 3, 1]
```
Args:
func: Function
Returns:
Function
"""
@functools.wraps(func)
def inner(*args, **kwargs):
return list(set(func(*args, **kwargs)))
return inner
named_timer(func)
Decorator to store time taken for wrapped functions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
func |
Callable |
Python Function |
required |
Returns:
Type | Description |
---|---|
Callable |
Decorated function |
Source code in fns/decorators.py
def named_timer(func: Callable) -> Callable:
"""
Decorator to store time taken for wrapped functions.
Args:
func: Python Function
Returns:
Decorated function
"""
@functools.wraps(func)
def inner(*args, **kwargs):
start_time = time.perf_counter()
value = func(*args, **kwargs)
name = func.__name__
named_timer.times[name] = time.perf_counter() - start_time
return value
return inner
show_shapes(func)
Decorator to log dataframe shape before and after applying a function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
func |
Callable |
Function that takes a dataframe as argument |
required |
Returns:
Type | Description |
---|---|
Callable |
function |
Source code in fns/decorators.py
def show_shapes(func: Callable) -> Callable:
"""
Decorator to log dataframe shape before and after applying a function.
Args:
func: Function that takes a dataframe as argument
Returns:
function
"""
@functools.wraps(func)
def inner(df):
print(f"Shape before {func.__name__}", df.shape)
out_df = func(df)
print(f"Shape after {func.__name__}", out_df.shape)
return out_df
return inner
timeit(func)
Decorator to calculate time taken for a function to complete.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
func |
Callable |
Python Function |
required |
Returns:
Type | Description |
---|---|
Callable |
Decorated function |
Source code in fns/decorators.py
def timeit(func: Callable) -> Callable:
"""
Decorator to calculate time taken for a function to complete.
Args:
func: Python Function
Returns:
Decorated function
"""
start_time = time.time()
@functools.wraps(func)
def inner(*args, **kwargs):
func(*args, **kwargs)
total_time_taken = time.time() - start_time
print("Total time taken: {} seconds".format(total_time_taken))
return inner
to(data_type)
Apply a data type to returned data from a function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
|
The data type to apply. Eg: list, int etc. |
required |
Returns:
Type | Description |
---|---|
Callable |
Decorator that applies the data type on returned data |
Source code in fns/decorators.py
def to(data_type) -> Callable:
"""
Apply a data type to returned data from a function.
Args:
data_type: The data type to apply. Eg: list, int etc.
Returns:
Decorator that applies the data type on returned data
"""
def decorator(func) -> Callable:
@functools.wraps(func)
def inner(*args, **kwargs):
return data_type(func(*args, **kwargs))
return inner
return decorator
Functions
dict_words()
Fetch default list of words present in Linux distros.
Returns:
Type | Description |
---|---|
List[str] |
List of words |
Source code in fns/lexicon.py
def dict_words() -> List[str]:
"""
Fetch default list of words present in Linux distros.
Returns:
List of words
"""
return Path("/usr/share/dict/words").read_text().splitlines()
onegram_count()
Get counts of 1-gram from Peter Norvig's list.
Returns:
Type | Description |
---|---|
DataFrame |
DataFrame with one-gram, count and idf scores. |
Source code in fns/lexicon.py
@lru_cache(1)
def onegram_count() -> pd.DataFrame:
"""
Get counts of 1-gram from Peter Norvig's list.
Returns:
DataFrame with one-gram, count and idf scores.
"""
df = pd.read_csv(
"https://norvig.com/ngrams/count_1w.txt",
sep="\t",
header=None,
names=["word", "count"],
)
df["idf"] = np.log(df["count"].sum() / df["count"])
df.sort_values(by="idf", ascending=True, inplace=True)
return df
Functions
baseline_accuracy(labels)
Get accuracy for always majority class classifier.
Usage:
>>> baseline_accuracy([0, 1])
50.0
Parameters:
Name | Type | Description | Default |
---|---|---|---|
labels |
List |
List of class labels. |
required |
Returns:
Type | Description |
---|---|
float |
Baseline accuracy |
Source code in fns/metrics.py
def baseline_accuracy(labels: List) -> float:
"""
Get accuracy for always majority class classifier.
Usage:
```python
>>> baseline_accuracy([0, 1])
50.0
```
Args:
labels: List of class labels.
Returns:
Baseline accuracy
"""
(label, count), *_ = Counter(labels).most_common(1)
return count / len(labels) * 100.0
benchmark_function(fn, repeat=5)
Benchmark time taken for a function and return metrics.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
Callable |
A python function |
required |
repeat |
int |
Number of samples |
5 |
Returns:
Type | Description |
---|---|
Dict |
Dictionary of total times, mean and std of times |
Source code in fns/metrics.py
def benchmark_function(fn: Callable, repeat: int = 5) -> Dict:
"""
Benchmark time taken for a function and return metrics.
Args:
fn: A python function
repeat: Number of samples
Returns:
Dictionary of total times, mean and std of times
"""
iteration_times = timeit.repeat(fn, repeat=repeat, number=1)
return {
"time": iteration_times,
"mean": np.mean(iteration_times),
"std": np.std(iteration_times),
}
clustering_report(y_true, y_pred)
Generate cluster evaluation metrics.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
y_true |
|
Array of actual labels |
required |
y_pred |
|
Array of predicted clusters |
required |
Returns:
Type | Description |
---|---|
DataFrame |
Pandas DataFrame with metrics. |
Source code in fns/metrics.py
def clustering_report(y_true, y_pred) -> pd.DataFrame:
"""
Generate cluster evaluation metrics.
Args:
y_true: Array of actual labels
y_pred: Array of predicted clusters
Returns:
Pandas DataFrame with metrics.
"""
return pd.DataFrame(
{
"Homogeneity": M.homogeneity_score(y_true, y_pred),
"Completeness": M.completeness_score(y_true, y_pred),
"V-Measure": M.v_measure_score(y_true, y_pred),
"Adjusted Rand Index": M.adjusted_rand_score(y_true, y_pred),
"Adjusted Mutual Information": M.adjusted_mutual_info_score(y_true, y_pred),
},
index=["value"],
).T
jaccard(x, y)
Compute jaccard similarity (intersection over union).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
x |
|
Array-like object |
required |
y |
|
Array-like object |
required |
Returns:
Type | Description |
---|---|
float |
Intersection Over Union score |
Source code in fns/metrics.py
def jaccard(x, y) -> float:
"""
Compute jaccard similarity (intersection over union).
Args:
x: Array-like object
y: Array-like object
Returns:
Intersection Over Union score
"""
s1 = set(x)
s2 = set(y)
if len(s1) == 0 and len(s2) == 0:
return 0
return len(s1 & s2) / len(s1 | s2)
missing_value_percent(df)
Get the percentage of missing values in each column.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame |
Pandas DataFrame |
required |
Returns:
Type | Description |
---|---|
DataFrame |
Percentage of missing value in each column. |
Source code in fns/metrics.py
def missing_value_percent(df: pd.DataFrame) -> pd.DataFrame:
"""
Get the percentage of missing values in each column.
Args:
df: Pandas DataFrame
Returns:
Percentage of missing value in each column.
"""
num_rows = len(df)
return (df.isna().sum() / num_rows * 100.0).sort_values(ascending=False)
multilabel_classification_report(y_true, y_pred)
Compute all metrics for a multi-label classification problem.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
y_true |
|
True binarized labels |
required |
y_pred |
|
Predicted binarized labels |
required |
Returns:
Type | Description |
---|---|
Series |
Pandas series of metrics |
Source code in fns/metrics.py
def multilabel_classification_report(y_true, y_pred) -> pd.Series:
"""
Compute all metrics for a multi-label classification problem.
Args:
y_true: True binarized labels
y_pred: Predicted binarized labels
Returns:
Pandas series of metrics
"""
scores = {
"accuracy": M.accuracy_score(y_true, y_pred),
"precision_macro": M.precision_score(y_true, y_pred, average="macro"),
"recall_macro": M.recall_score(y_true, y_pred, average="macro"),
"f1_samples": M.f1_score(y_true, y_pred, average="samples"),
"f1_macro": M.f1_score(y_true, y_pred, average="macro"),
"f1_weighted": M.f1_score(y_true, y_pred, average="weighted"),
"hamming_loss": M.hamming_loss(y_true, y_pred),
}
return pd.Series(scores)
n_clusters(data)
Generate number of clusters to create.
Heuristic: Number of clusters = square root of total data points
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
|
Total number of data points or the data point itself |
required |
Returns:
Type | Description |
---|---|
int |
Number of clusters |
Source code in fns/metrics.py
def n_clusters(data) -> int:
"""
Generate number of clusters to create.
Heuristic:
Number of clusters = square root of total data points
Args:
data: Total number of data points or the data point itself
Returns:
Number of clusters
"""
if type(data) is int:
total_rows = data
else:
total_rows = len(set(data))
return int(math.sqrt(total_rows))
outlier_cutoff(values)
Generate the lower and upper bound for outliers.
Extra:
Lower bound: < Q1 - 1.5 * IQR
Upper bound: > Q3 + 1.5 * IQR
Parameters:
Name | Type | Description | Default |
---|---|---|---|
values |
List |
List of numerical values |
required |
Returns:
Type | Description |
---|---|
Tuple[float, float] |
Tuple of (lower-cutoff, upper-cutoff) |
Source code in fns/metrics.py
def outlier_cutoff(values: List) -> Tuple[float, float]:
"""
Generate the lower and upper bound for outliers.
Extra:
```
Lower bound: < Q1 - 1.5 * IQR
Upper bound: > Q3 + 1.5 * IQR
```
Args:
values: List of numerical values
Returns:
Tuple of (lower-cutoff, upper-cutoff)
"""
q1 = np.quantile(values, 0.25)
q3 = np.quantile(values, 0.75)
iqr = q3 - q1
lower_threshold = q1 - 1.5 * iqr
upper_threshold = q3 + 1.5 * iqr
return lower_threshold, upper_threshold
sorted_classification_report(y_true, y_pred, **kwargs)
Generate class-wise classification report sorted from worst to best.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
y_true |
|
Actual labels |
required |
y_pred |
|
Predicted labels |
required |
Returns:
Type | Description |
---|---|
DataFrame |
Classification report in sorted form. |
Source code in fns/metrics.py
def sorted_classification_report(y_true, y_pred, **kwargs) -> pd.DataFrame:
"""
Generate class-wise classification report sorted from worst to best.
Args:
y_true: Actual labels
y_pred: Predicted labels
Returns:
Classification report in sorted form.
"""
base_report = M.classification_report(y_true, y_pred, output_dict=True, **kwargs)
base_report_df = pd.DataFrame.from_dict(base_report).T
class_wise_df = base_report_df.iloc[:-3].sort_values(by="f1-score")
summary_df = base_report_df.iloc[-3:]
combined_df = pd.concat([class_wise_df, summary_df])
combined_df["support"] = combined_df["support"].astype(int)
return combined_df
Functions
grid_report(cv)
Display results from cross-validation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cv |
|
Result of cross-validation |
required |
Returns:
Type | Description |
---|---|
DataFrame |
Pandas DataFrame |
Source code in fns/model_selection.py
def view_result_table(cv) -> pd.DataFrame:
"""
Display results from cross-validation.
Args:
cv: Result of cross-validation
Returns:
Pandas DataFrame
"""
columns = ["params", "mean_test_score", "std_test_score", "rank_test_score"]
return pd.DataFrame(cv.cv_results_)[columns].sort_values(by=["rank_test_score"])
train_val_size(dataset, val_ratio=0.1)
Return the train and validation data sizes based on split ratio.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset |
|
A python collection |
required |
val_ratio |
float |
Ratio for validation dataset |
0.1 |
Returns:
Type | Description |
---|---|
Tuple[int, int] |
Tuple of number of rows for (training, validation) |
Source code in fns/model_selection.py
def train_val_size(dataset, val_ratio: float = 0.1) -> Tuple[int, int]:
"""
Return the train and validation data sizes based on split ratio.
Args:
dataset: A python collection
val_ratio: Ratio for validation dataset
Returns:
Tuple of number of rows for (training, validation)
"""
val_size = int(val_ratio * len(dataset))
train_size = len(dataset) - val_size
return train_size, val_size
view_result_table(cv)
Display results from cross-validation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cv |
|
Result of cross-validation |
required |
Returns:
Type | Description |
---|---|
DataFrame |
Pandas DataFrame |
Source code in fns/model_selection.py
def view_result_table(cv) -> pd.DataFrame:
"""
Display results from cross-validation.
Args:
cv: Result of cross-validation
Returns:
Pandas DataFrame
"""
columns = ["params", "mean_test_score", "std_test_score", "rank_test_score"]
return pd.DataFrame(cv.cv_results_)[columns].sort_values(by=["rank_test_score"])
Functions
validate_multiple_labels(y_raw)
Validate binarization of labels in a multi-label setting.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
y_raw |
List[List] |
Raw list of list of labels. |
required |
Returns:
Type | Description |
---|---|
None |
Source code in fns/multi_label.py
def validate_multiple_labels(y_raw: List[List]) -> None:
"""
Validate binarization of labels in a multi-label setting.
Args:
y_raw: Raw list of list of labels.
Returns:
"""
y = MultiLabelBinarizer().fit_transform(y_raw)
# Assert that every sample has atleast one label
assert (y.sum(axis=1) == 0).sum() == 0
# Assert that every label is assigned to some data point
assert (y.sum(axis=0) == 0).sum() == 0
# Assert that no label is assigned to only one data point
assert not (y.sum(axis=0) == 1).any()
Functions
download(file_path)
Download a file at given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
|
File path |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in fns/notebook.py
def download(file_path) -> None:
"""
Download a file at given path.
Args:
file_path: File path
Returns:
None
"""
from IPython.display import Javascript
script = f"""
var host = window.location.host;
var downloadLink = window.location.protocol + "//" + host + "/files/{file_path}"
window.open(downloadLink)
"""
return Javascript(script)
download_df(df, csv_path=None)
Download a dataframe as a CSV with a random filename.
The filename is set to a random UUID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame |
Pandas DataFrame |
required |
csv_path |
|
CSV filename. |
None |
Returns:
Type | Description |
---|---|
None |
None |
Source code in fns/notebook.py
def download_df(df: pd.DataFrame, csv_path=None) -> None:
"""
Download a dataframe as a CSV with a random filename.
The filename is set to a random UUID.
Args:
df: Pandas DataFrame
csv_path: CSV filename.
Returns:
None
"""
from IPython.display import display
if not csv_path:
from uuid import uuid4
csv_path = f"{uuid4()}.csv"
df.to_csv(csv_path, index=False)
display(download(file_path=csv_path))
time.sleep(1)
Path(csv_path).unlink()
filter_column(df, column_name)
Show an interactive widget to filter a column in dataframe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame |
Pandas DataFrame |
required |
column_name |
str |
Column Name of the DataFrame |
required |
Returns:
Type | Description |
---|---|
None |
Interactive widget for filtering. |
Source code in fns/notebook.py
def filter_column(df: pd.DataFrame, column_name: str) -> None:
"""
Show an interactive widget to filter a column in dataframe.
Args:
df: Pandas DataFrame
column_name: Column Name of the DataFrame
Returns:
Interactive widget for filtering.
"""
from ipywidgets import interact
options = sorted(df[column_name].unique())
interact(lambda value: df[df[column_name] == value], value=options)
highlight_phrases(original_text, phrases, color_palette='Greens', weight=0.2)
Highlight a list of phrases in a text.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
original_text |
str |
Sentence |
required |
phrases |
Union[List[str], str] |
A single phrase or a list of phrases |
required |
color_palette |
str |
Any valid matplotlib color palette name |
'Greens' |
weight |
float |
Darkness of the color |
0.2 |
Returns:
Type | Description |
---|---|
None |
None |
Source code in fns/notebook.py
def highlight_phrases(
original_text: str,
phrases: Union[List[str], str],
color_palette: str = "Greens",
weight: float = 0.2,
) -> None:
"""
Highlight a list of phrases in a text.
Args:
original_text: Sentence
phrases: A single phrase or a list of phrases
color_palette: Any valid matplotlib color palette name
weight: Darkness of the color
Returns:
None
"""
import matplotlib.cm
from IPython.display import HTML, display
html = original_text
cmap = matplotlib.cm.get_cmap(color_palette)
color = f"rgba{cmap(weight, bytes=True)}"
if type(phrases) is str:
phrases = [phrases]
for phrase in phrases:
highlighted_phrase = (
f'<span style="background-color: {color}; font-weight: {weight * 800};">'
f"{phrase}"
f"</span>"
)
html = html.replace(phrase, highlighted_phrase)
display(HTML(f'<p style="color: #444; font-size:1.5em;">{html}</p>'))
print_bullets(lines)
Display a list of text as bullet points.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
lines |
List[str] |
List of texts |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in fns/notebook.py
def print_bullets(lines: List[str]) -> None:
"""
Display a list of text as bullet points.
Args:
lines: List of texts
Returns:
None
"""
bullet_points = "\n".join(f"- `{line}`" for line in sorted(lines))
print_markdown(bullet_points)
print_header(text, level=2)
Display a text as markdown header.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
text |
str |
Text |
required |
level |
int |
2 for H2, 3 for H3 upto 6. |
2 |
Returns:
Type | Description |
---|---|
None |
None |
Source code in fns/notebook.py
def print_header(text: str, level: int = 2) -> None:
"""
Display a text as markdown header.
Args:
text: Text
level: 2 for H2, 3 for H3 upto 6.
Returns:
None
"""
print_markdown(f'{"#" * level} {text}')
search_dataframe(df)
Show an interactive widget to search text fields of a dataframe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame |
Pandas DataFrame |
required |
Returns:
Type | Description |
---|---|
None |
Interactive widget for searching. |
Source code in fns/notebook.py
def search_dataframe(df: pd.DataFrame) -> None:
"""
Show an interactive widget to search text fields of a dataframe.
Args:
df: Pandas DataFrame
Returns:
Interactive widget for searching.
"""
from ipywidgets import interact
from IPython.display import display
def _search(query: str, column: str):
if query:
with pd.option_context(
"display.max_rows", None, "display.max_columns", None
):
filtered_df = df[
df[column].str.contains(query, case=False, regex=False)
]
display(filtered_df)
string_columns = df.select_dtypes("object").columns.tolist()
interact(_search, query="", column=string_columns)
show_examples(df, group_column, data_column, n=5)
Show random examples for each sub-group in a dataframe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame |
Dataframe |
required |
group_column |
str |
Column name for performing group by |
required |
data_column |
str |
Column to show examples for |
required |
n |
int |
Number of examples |
5 |
Returns:
Type | Description |
---|---|
|
Markdown |
Source code in fns/notebook.py
def show_examples(df: pd.DataFrame, group_column: str, data_column: str, n: int = 5):
"""
Show random examples for each sub-group in a dataframe.
Args:
df: Dataframe
group_column: Column name for performing group by
data_column: Column to show examples for
n: Number of examples
Returns:
Markdown
"""
from IPython.display import Markdown
generated_text = ""
for group_name, subset in df.explode(group_column).groupby(group_column):
examples = subset[data_column].sample(n)
generated_text += f"## {group_name}\n\n"
generated_text += "\n".join([f"- {example}" for example in examples])
generated_text += "\n\n"
return Markdown(generated_text)
Functions
confusion_matrix_plot(y_true, y_pred)
Plot a confusion matrix.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
y_true |
|
List of true labels |
required |
y_pred |
|
List of prediction labels |
required |
Returns:
Type | Description |
---|---|
None |
Source code in fns/plot.py
def confusion_matrix_plot(y_true, y_pred) -> None:
"""
Plot a confusion matrix.
Args:
y_true: List of true labels
y_pred: List of prediction labels
Returns:
"""
from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix
cm = confusion_matrix(y_true, y_pred)
plot = ConfusionMatrixDisplay(confusion_matrix=cm).plot()
plot.ax_.set_title("Confusion Matrix")
Functions
combine_hyphenated_word(text)
Combine words in text that contain hyphen.
Example: e-email to email
Parameters:
Name | Type | Description | Default |
---|---|---|---|
text |
str |
A sentence |
required |
Returns:
Type | Description |
---|---|
str |
Processed sentence |
Source code in fns/preprocessing.py
def combine_hyphenated_word(text: str) -> str:
"""
Combine words in text that contain hyphen.
Example: e-email to email
Args:
text: A sentence
Returns:
Processed sentence
"""
return " ".join(
w.replace("-", "") if _re_hyphen_word.match(w) else w for w in text.split()
)
normalize_json(json_data)
Convert any non-standard types in dictionary to basic types.
The normalization prevent errors during serialization.
Usage:
>>> normalize_json({'nums': np.array([1, 2, 3]})
{'nums': [1, 2, 3]}
Parameters:
Name | Type | Description | Default |
---|---|---|---|
json_data |
Dict |
Dictionary |
required |
Returns:
Type | Description |
---|---|
Dict |
Normalized dictionary |
Source code in fns/preprocessing.py
def normalize_json(json_data: Dict) -> Dict:
"""
Convert any non-standard types in dictionary to basic types.
The normalization prevent errors during serialization.
Usage:
```python
>>> normalize_json({'nums': np.array([1, 2, 3]})
{'nums': [1, 2, 3]}
```
Args:
json_data: Dictionary
Returns:
Normalized dictionary
"""
return json.loads(json.dumps(json_data, cls=NpEncoder))
remove_hashtag(t)
Remove hashtag from the text.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
t |
str |
Text |
required |
Returns:
Type | Description |
---|---|
str |
Text without hashtag |
Source code in fns/preprocessing.py
def remove_hashtag(t: str) -> str:
"""
Remove hashtag from the text.
Args:
t: Text
Returns:
Text without hashtag
"""
return _re_hashtag.sub("", t)
remove_hyperlink(t)
Remove hyperlinks from a text.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
t |
str |
Text |
required |
Returns:
Type | Description |
---|---|
str |
Text without hyperlinks. |
Source code in fns/preprocessing.py
def remove_hyperlink(t: str) -> str:
"""
Remove hyperlinks from a text.
Args:
t: Text
Returns:
Text without hyperlinks.
"""
return _re_hyperlink.sub("", t)
remove_multiple_commas(t)
Substitute multiple consecutive commas with a single comma.
Usage:
>>> remove_multiple_commas('a,,b,c')
'a,b,c'
Parameters:
Name | Type | Description | Default |
---|---|---|---|
t |
str |
Text |
required |
Returns:
Type | Description |
---|---|
str |
Text without multiple commas. |
Source code in fns/preprocessing.py
def remove_multiple_commas(t: str) -> str:
"""
Substitute multiple consecutive commas with a single comma.
Usage:
```python
>>> remove_multiple_commas('a,,b,c')
'a,b,c'
```
Args:
t: Text
Returns:
Text without multiple commas.
"""
return _re_comma.sub(",", t)
remove_multiple_space(t)
Remove multiple spaces from the text.
Adapted from: https://github.com/fastai/fastai/blob/master/fastai/text/core.py
Parameters:
Name | Type | Description | Default |
---|---|---|---|
t |
str |
Text |
required |
Returns:
Type | Description |
---|---|
str |
Text without multiple space. |
Source code in fns/preprocessing.py
def remove_multiple_space(t: str) -> str:
"""
Remove multiple spaces from the text.
Adapted from: https://github.com/fastai/fastai/blob/master/fastai/text/core.py
Args:
t: Text
Returns:
Text without multiple space.
"""
return _re_space.sub(" ", t)
remove_new_lines(text)
Strip away new lines at end.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
t |
|
Text |
required |
Returns:
Type | Description |
---|---|
str |
Text without newline at end. |
Source code in fns/preprocessing.py
def remove_new_lines(text: str) -> str:
"""
Strip away new lines at end.
Args:
t: Text
Returns:
Text without newline at end.
"""
if isinstance(text, str):
return text.replace("\\n", "").strip()
return text
remove_punctuation(text)
Remove all punctuations from a text.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
text |
str |
Sentence |
required |
Source code in fns/preprocessing.py
def remove_punctuation(text: str) -> str:
"""
Remove all punctuations from a text.
Args:
text: Sentence
"""
return "".join(t for t in text if t not in string.punctuation)
remove_retweet(t)
Remove RT from the text.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
t |
str |
Text |
required |
Returns:
Type | Description |
---|---|
str |
Text without RT symbol. |
Source code in fns/preprocessing.py
def remove_retweet(t: str) -> str:
"""
Remove RT from the text.
Args:
t: Text
Returns:
Text without RT symbol.
"""
return _re_retweet.sub("", t)
remove_separator(text)
Keep only alphabet, number and space.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
text |
str |
required |
Returns:
Type | Description |
---|---|
str |
Source code in fns/preprocessing.py
def remove_separator(text: str) -> str:
"""
Keep only alphabet, number and space.
Args:
text:
Returns:
"""
no_separator_regex = re.compile(r"[^a-zA-Z0-9\s]")
return no_separator_regex.sub("", text)
Functions
create_download_link(dataframe, filename, file_type='csv', index=False, header=True)
Generate a download link for a pandas dataframe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataframe |
|
Pandas DataFrame |
required |
filename |
str |
Name of exported file |
required |
file_type |
str |
Either 'csv' or 'tsv' |
'csv' |
index |
bool |
Whether to include index of dataframe or not |
False |
header |
bool |
Whether to include header of dataframe or not |
True |
Returns:
Type | Description |
---|---|
|
Markdown to place in st.markdown(...) |
Source code in fns/streamlit_utils.py
def create_download_link(
dataframe,
filename: str,
file_type: str = "csv",
index: bool = False,
header: bool = True,
):
"""
Generate a download link for a pandas dataframe.
Args:
dataframe: Pandas DataFrame
filename: Name of exported file
file_type: Either 'csv' or 'tsv'
index: Whether to include index of dataframe or not
header: Whether to include header of dataframe or not
Returns:
Markdown to place in st.markdown(...)
"""
if file_type == "csv":
dataframe_csv = dataframe.to_csv(index=index)
elif file_type == "tsv":
dataframe_csv = dataframe.to_csv(
index=index, sep="\t", header=header, quoting=csv.QUOTE_NONNUMERIC
)
else:
raise Exception('Invalid file_type. Allowed values are "csv" and "tsv".')
b64 = base64.b64encode(dataframe_csv.encode()).decode()
href = f'**DOWNLOAD:** <a href="data:file/csv;base64,{b64}" download="{filename}">{filename}</a>'
return href
Functions
export_fasttext_format(texts, labels, filename)
Export training data to a fasttext compatible format.
Format: __label__POSITIVE it was good
Parameters:
Name | Type | Description | Default |
---|---|---|---|
texts |
List[str] |
List of sentences |
required |
labels |
Union[List[str], List[List[str]]] |
List of single or multi-label classes |
required |
filename |
|
Exported filename |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in fns/text.py
def export_fasttext_format(
texts: List[str], labels: Union[List[str], List[List[str]]], filename
) -> None:
"""
Export training data to a fasttext compatible format.
Format:
__label__POSITIVE it was good
Args:
texts: List of sentences
labels: List of single or multi-label classes
filename: Exported filename
Returns:
None
"""
output = []
for text, text_label in zip(texts, labels):
if type(text_label) is str:
text_label = [text_label]
labels = " ".join([f"__label__{label}" for label in text_label])
output.append(f"{labels} {text}\n")
with open(filename, "w") as fp:
fp.writelines(output)
extract_abbreviations(texts)
Get a list of all-capitalized words.
Example: WWW, HTTP, etc.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
texts |
List[str] |
List of sentences |
required |
Returns:
Type | Description |
---|---|
List[str] |
List of abbreviations |
Source code in fns/text.py
def extract_abbreviations(texts: List[str]) -> List[str]:
"""
Get a list of all-capitalized words.
Example: WWW, HTTP, etc.
Args:
texts: List of sentences
Returns:
List of abbreviations
"""
combined_text = "\n".join(texts)
symbols = re.findall(r"\b[A-Z][A-Z]+\b", combined_text)
return list(set(symbols))
extract_discriminative_keywords(df, category_column, text_column, ngram=2, n=10)
Generate discriminative keywords for texts in each category.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame |
Dataframe with text and category columns. |
required |
text_column |
str |
Column name containing texts |
required |
category_column |
str |
Column name for the text category |
required |
ngram |
int |
1 for words, 2 for bigram and so on. |
2 |
n |
int |
Number of keywords to return. |
10 |
Returns:
Type | Description |
---|---|
DataFrame |
Dataframe with categories in columns and top-n keywords in each columns. |
Source code in fns/text.py
def extract_discriminative_keywords(
df: pd.DataFrame,
category_column: str,
text_column: str,
ngram: int = 2,
n: int = 10,
) -> pd.DataFrame:
"""
Generate discriminative keywords for texts in each category.
Args:
df: Dataframe with text and category columns.
text_column: Column name containing texts
category_column: Column name for the text category
ngram: 1 for words, 2 for bigram and so on.
n: Number of keywords to return.
Returns:
Dataframe with categories in columns and top-n keywords in each columns.
"""
# Combine all texts into a single document for each category
category_docs = df.groupby(by=category_column)[text_column].apply(" ".join)
categories = category_docs.index.tolist()
tfidf = TfidfVectorizer(
ngram_range=(1, ngram),
stop_words="english",
strip_accents="unicode",
sublinear_tf=True,
)
document_vectors = tfidf.fit_transform(category_docs).A
keywords = np.array(tfidf.get_feature_names())
top_terms = document_vectors.argsort(axis=1)[:, :n]
return pd.DataFrame(keywords[top_terms].T, columns=categories)
extract_tfidf_keywords(texts, ngram=2, n=10)
Get top keywords based on mean tf-idf term score.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
texts |
List[str] |
List of sentences |
required |
ngram |
int |
1 for words, 2 for bigram and so on. |
2 |
n |
int |
Number of keywords to extract |
10 |
Returns:
Type | Description |
---|---|
List[str] |
Keywords |
Source code in fns/text.py
def extract_tfidf_keywords(texts: List[str], ngram: int = 2, n: int = 10) -> List[str]:
"""
Get top keywords based on mean tf-idf term score.
Args:
texts: List of sentences
ngram: 1 for words, 2 for bigram and so on.
n: Number of keywords to extract
Returns:
Keywords
"""
tfidf = TfidfVectorizer(
ngram_range=(1, ngram),
stop_words="english",
strip_accents="unicode",
sublinear_tf=True,
)
vectors = tfidf.fit_transform(texts)
term_tfidf = vectors.A.mean(axis=0)
terms = np.array(tfidf.get_feature_names())
return terms[term_tfidf.argsort()[::-1]][:n].tolist()
is_non_ascii(text)
Check if text has non-ascci characters.
Useful heuristic to find text containing emojis and non-english characters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
text |
str |
Sentence |
required |
Returns:
Type | Description |
---|---|
bool |
True if the text contains non-ascii characters. |
Source code in fns/text.py
def is_non_ascii(text: str) -> bool:
"""
Check if text has non-ascci characters.
Useful heuristic to find text containing emojis and non-english
characters.
Args:
text: Sentence
Returns:
True if the text contains non-ascii characters.
"""
try:
text.encode("ascii")
return False
except UnicodeEncodeError:
return True
md5_hash(text)
Generate MD5 hash of a text.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
text |
str |
String |
required |
Returns:
Type | Description |
---|---|
str |
MD5 hash |
Source code in fns/text.py
def md5_hash(text: str) -> str:
"""
Generate MD5 hash of a text.
Args:
text: String
Returns:
MD5 hash
"""
return hashlib.md5(text.encode("utf-8")).hexdigest()
num_words(text)
Counts the number of words using whitespace as delimiter.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
text |
str |
Sentence |
required |
Returns:
Type | Description |
---|---|
int |
Number of words |
Source code in fns/text.py
def num_words(text: str) -> int:
"""
Counts the number of words using whitespace as delimiter.
Args:
text: Sentence
Returns:
Number of words
"""
return len(text.split())
offset_by_one(x, sequence_length=3)
Generate a list of small sequences offset by 1.
Usage:
>>> offset_by_one([1, 2, 3, 4, 5], sequence_length=3)
[([1, 2, 3], [2, 3, 4])]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
x |
|
Python list |
required |
sequence_length |
int |
Chunk size |
3 |
Source code in fns/text.py
def offset_by_one(x, sequence_length: int = 3):
"""
Generate a list of small sequences offset by 1.
Usage:
```python
>>> offset_by_one([1, 2, 3, 4, 5], sequence_length=3)
[([1, 2, 3], [2, 3, 4])]
```
Args:
x: Python list
sequence_length: Chunk size
Returns:
"""
sl = sequence_length
return [
(x[i : i + sl], x[i + 1 : i + sl + 1]) for i in range(0, len(x) - sl - 1, sl)
]
sha256hash(text)
Generate MD5 hash of a text.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
text |
str |
String |
required |
Returns:
Type | Description |
---|---|
str |
SHA256 hash |
Source code in fns/text.py
def sha256hash(text: str) -> str:
"""
Generate MD5 hash of a text.
Args:
text: String
Returns:
SHA256 hash
"""
return hashlib.sha256(text.encode("utf-8")).hexdigest()
span_positions(text, phrases)
Find span position of phrases in a text.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
text |
str |
Sentence |
required |
phrases |
List[str] |
List of phrases |
required |
Returns:
Type | Description |
---|---|
List[Tuple[int, int]] |
List of span positions for each phrase. The span position is a tuple of start and end index. |
Source code in fns/text.py
def span_positions(text: str, phrases: List[str]) -> List[Tuple[int, int]]:
"""
Find span position of phrases in a text.
Args:
text: Sentence
phrases: List of phrases
Returns:
List of span positions for each phrase.
The span position is a tuple of start and end index.
"""
capture_group = "|".join([re.escape(phrase) for phrase in phrases])
reg = re.compile(rf"\b({capture_group})\b", flags=re.IGNORECASE)
return [match.span() for match in reg.finditer(text)]
unique_chars(texts)
Get a list of unique characters from list of text.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
texts |
List[str] |
List of sentences |
required |
Returns:
Type | Description |
---|---|
List[str] |
A sorted list of unique characters |
Source code in fns/text.py
def unique_chars(texts: List[str]) -> List[str]:
"""
Get a list of unique characters from list of text.
Args:
texts: List of sentences
Returns:
A sorted list of unique characters
"""
return sorted(set("".join(texts)))
window(tokens, size=3)
Generate samples for a window size.
Examples:
>>> window(['a', 'b', 'c', 'd'], size=2)
[(['a', 'b'], 'c'), (['b', 'c'], 'd')]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tokens |
|
List of tokens |
required |
size |
int |
Window size |
3 |
Returns:
Type | Description |
---|---|
|
List of windowed samples |
Source code in fns/text.py
def window(tokens, size: int = 3):
"""
Generate samples for a window size.
Example:
```python
>>> window(['a', 'b', 'c', 'd'], size=2)
[(['a', 'b'], 'c'), (['b', 'c'], 'd')]
```
Args:
tokens: List of tokens
size: Window size
Returns:
List of windowed samples
"""
return [
(tokens[i : i + size], tokens[i + size])
for i in range(0, len(tokens) - size, 1)
]
Functions
imagenet_index_to_class()
Get a mapping from imagenet class index to class names.
Returns:
Type | Description |
---|---|
Dict[int, str] |
Mapping from imagenet class index to class names |
Source code in fns/vision.py
def imagenet_index_to_class() -> Dict[int, str]:
"""
Get a mapping from imagenet class index to class names.
Returns:
Mapping from imagenet class index to class names
"""
raw_mapping = json.load(urlopen(IMAGENET_LABEL_TO_CLASS_URL))
return {int(index): class_name for index, class_name in raw_mapping.items()}