import getpass
import io
import logging
import multiprocessing
import os
import re
import subprocess
import warnings
from typing import Optional
from xml.etree import ElementTree
try:
import pyarrow
from pyarrow.fs import FileSelector, FileType, HadoopFileSystem
has_hdfs = True
except ImportError:
has_hdfs = False
from .fs import FS, FileStat, ForkedError, format_repr
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
def _parse_principal_name_from_klist(output):
output_array = output.split('\n')
if len(output_array) < 2:
return None
principle_str = output_array[1]
klist_principal_pattern = re.compile(
r'Default principal: (?P<username>.+)@(?P<service>.+)')
ret = klist_principal_pattern.match(principle_str)
if ret:
pattern_dict = ret.groupdict()
return pattern_dict['username']
else:
return None
def _parse_principal_name_from_keytab(output):
output_array = output.split('\n')
if len(output_array) < 4:
return None
principle_str = output_array[3]
keytab_principle_pattern = re.compile(
r'\s+\d+ (?P<username>.+)@(?P<service>.+)')
ret = keytab_principle_pattern.match(principle_str)
if ret:
pattern_dict = ret.groupdict()
return pattern_dict['username']
else:
return None
def _get_principal_name_from_keytab():
output = _run_klist(use_keytab=True)
if output is None:
return None
return _parse_principal_name_from_keytab(output.decode('utf-8'))
def _get_principal_name_from_klist():
output = _run_klist()
if output is None:
return None
return _parse_principal_name_from_klist(output.decode('utf-8'))
def _run_klist(use_keytab=False):
try:
command = ['klist']
if use_keytab:
command += ['-k']
pipe = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = pipe.communicate()
if out == b'' and err != b'':
return None
else:
return out
except OSError:
# klist is not found
return None
class HdfsFileStat(FileStat):
"""Detailed information of a file in HDFS
Attributes:
filename (str): Derived from `~FileStat`.
last_modifled (float): Derived from `~FileStat`.
No sub-second precision.
last_accessed (float): UNIX timestamp of last access time.
No sub-second precision.
mode (int): Derived from `~FileStat`.
size (int): Derived from `~FileStat`.
"""
def __init__(self, info):
self._info = info
mode = 0
if info.type == FileType.File:
mode |= 0o100000
elif info.type == FileType.Directory:
mode |= 0o40000
self.filename = info.base_name
self.mode = mode
self.last_modified = info.mtime.timestamp()
# no atime supported by PyArrow new API
self.last_accessed = info.mtime.timestamp()
self.size = info.size
def _ensure_arrow_envs(hadoop_home):
if os.getenv("ARROW_LIBHDFS_DIR") is None:
arrow_dir = os.path.join(hadoop_home, "lib")
libhdfs = os.path.join(arrow_dir, "libhdfs.so")
if os.path.exists(libhdfs):
os.environ["ARROW_LIBHDFS_DIR"] = arrow_dir
else:
arrow_dir = os.path.join(hadoop_home, "lib/native")
libhdfs = os.path.join(arrow_dir, "libhdfs.so")
if os.path.exists(libhdfs):
os.environ["ARROW_LIBHDFS_DIR"] = arrow_dir
else:
msg = "No libhdfs.so found from $HADOOP_HOME: {}".format(
hadoop_home)
raise RuntimeError(msg)
def _create_fs():
confdir = os.getenv('HADOOP_CONF_DIR', '/etc/hadoop/conf')
conffile = os.path.join(confdir, 'hdfs-site.xml')
root = ElementTree.parse(conffile)
# Valid envs required. Typically, /opt/cloudera/parcels/CDH/lib
if os.getenv("HADOOP_HOME"):
hadoop_home = os.getenv("HADOOP_HOME")
_ensure_arrow_envs(hadoop_home)
if os.getenv("CLASSPATH") is None:
# TODO(kuenishi): CLASSPATH=`$HADOOP_HOME/bin/hdfs classpath --glob`
cmd = ["hdfs", "classpath", "--glob"]
cp = subprocess.run(cmd, stdout=subprocess.PIPE)
cp.check_returncode()
os.environ["CLASSPATH"] = cp.stdout.decode()
assert os.getenv("ARROW_LIBHDFS_DIR"), "LIBHDFS not found"
assert os.getenv("CLASSPATH"), "CLASSPATH not defined"
configs = {}
for e in root.findall('./property'):
name = None
for c in e:
if c.tag == 'name':
name = c.text
elif c.tag == 'value':
value = c.text
if name:
configs[name] = value
for nameservice in configs.get('dfs.nameservices', '').split(','):
# TODO(kuenishi): We don't have such use case where we switch
# amont multiple name services from single HADOOP_CONF_DIR
# conf. Thus we ignore fs.defaultFS and just take the very
# first name service that appeared in hdfs-site.xml.
return nameservice, HadoopFileSystem(nameservice, 0)
else:
RuntimeError("No nameservice found.")
[docs]
class Hdfs(FS):
'''Hadoop FileSystem wrapper
To use HDFS, PFIO requires ``$HADOOP_HOME`` predefined before
initialization. If it is not defined, ``ARROW_LIBHDFS_DIR`` must
be defined instead. ``$CLASSPATH`` will be needed in case ``hdfs``
command is not available from ``$PATH``.
.. warning:: It is strongly discouraged to use :class:`Hdfs` under
multiprocessing. Once the object detects the process id changed
(which means it is forked), the object raises :class:`ForkedError`
before doing anything. If you do *need* forking, for example, PyTorch
DataLoader with multiple workers for performance, it is strongly
recommended not to instantiate :class:`Hdfs` before forking. Details
are described in PFIO issue #123. Simple workaround is to set
multiprocessing start method as ``'forkserver'`` and start
the very first child process before everything.
.. code-block::
import multiprocessing
multiprocessing.set_start_method('forkserver')
p = multiprocessing.Process()
p.start()
p.join()
See: https://github.com/pfnet/pfio/issues/123
.. note:: With environment variable
``KRB5_KTNAME=path/to/your.keytab`` set, ``hdfs``
handler automatically starts automatic and periodical
updating Kerberos ticket using `krbticket
<https://pypi.org/project/krbticket/>`_ . The update
frequency is every 10 minutes by default.
.. note::
Only the username in the first entry in The
keytab will be used to update the Kerberos ticket.
'''
def __init__(self, cwd=None, create=False, **_):
super().__init__()
self._nameservice, self._fs = _create_fs()
assert self._fs is not None
self.username = self._get_principal_name()
self.cwd = os.path.join('/user', self.username)
if cwd is not None:
if cwd.startswith('/'):
self.cwd = cwd
else:
self.cwd = os.path.join(self.cwd, cwd)
if not self.isdir(''):
if create:
# Since this process (isdir -> makedirs) is not atomic,
# makedirs can conflict in case of a parallel workload.
self.makedirs('', exist_ok=True)
else:
raise ValueError('{} must be a directory'.format(self.cwd))
if multiprocessing.get_start_method() != 'forkserver':
# See https://github.com/pfnet/pfio/pull/123 for detail
warnings.warn('Non-forkserver start method under HDFS detected.')
def _reset(self):
if multiprocessing.get_start_method() != 'forkserver':
raise ForkedError()
self._nameservice, self._fs = _create_fs()
def __getstate__(self):
state = self.__dict__.copy()
state['_fs'] = None
return state
def __setstate__(self, state):
self.__dict__ = state
def __repr__(self):
return format_repr(
Hdfs,
{
"cwd": self._cwd,
},
)
def _get_principal_name(self):
# get the default principal name from `klist` cache
principal_name = _get_principal_name_from_klist()
if principal_name is not None:
return principal_name
# try getting principal name from keytab
principal_name = _get_principal_name_from_keytab()
if principal_name is not None:
return principal_name
# in case every thing, use the login username instead
return self._get_login_username()
def _get_login_username(self):
return getpass.getuser()
def open(self, file_path, mode='rb',
buffering=-1, encoding=None, errors=None,
newline=None, closefd=True, opener=None):
self._checkfork()
path = os.path.join(self.cwd, file_path)
try:
if 'r' in mode:
file_obj = self._fs.open_input_file(path)
else:
file_obj = self._fs.open_output_stream(path)
except pyarrow.lib.ArrowIOError as e:
raise IOError("open file error :{}".format(str(e)))
return self._wrap_file_obj(file_obj, mode, encoding, errors, newline)
def _wrap_file_obj(self, file_obj, mode, encoding, errors, newline):
if 'b' not in mode:
return io.TextIOWrapper(file_obj, encoding, errors, newline)
elif 'r' in mode:
# Wrap file_obj with io.BufferedReader for ``peek()``, to
# significiantly improve unpickle performance.
return io.BufferedReader(file_obj)
elif 'w' in mode:
return io.BufferedWriter(file_obj)
else:
raise ValueError("invalid option")
[docs]
def subfs(self, rel_path):
return Hdfs(os.path.join(self.cwd, rel_path))
def close(self):
pass
[docs]
def list(self, path: Optional[str] = "", recursive=False, detail=False):
self._checkfork()
if not self.isdir(path):
raise NotADirectoryError(path)
path = os.path.join(self.cwd, "" if path is None else path)
norm_path = self._fs.normalize_path(path).rstrip('/')
infos = self._fs.get_file_info(FileSelector(path, recursive=recursive))
for file_info in infos:
if detail:
yield HdfsFileStat(file_info)
else:
yield file_info.path[len(norm_path)+1:]
[docs]
def stat(self, path):
self._checkfork()
path = os.path.join(self.cwd, path)
info = self._fs.get_file_info(path)
if info.type == FileType.NotFound:
raise FileNotFoundError()
else:
return HdfsFileStat(info)
[docs]
def isdir(self, path: Optional[str]):
self._checkfork()
path = os.path.join(self.cwd, "" if path is None else path)
info = self._fs.get_file_info(path)
return info.type == FileType.Directory
[docs]
def mkdir(self, path: str, *args, dir_fd=None):
self._checkfork()
path = os.path.join(self.cwd, path)
return self._fs.create_dir(path)
[docs]
def makedirs(self, path: str, mode=0o777, exist_ok=False):
self._checkfork()
if self.exists(path) and self.isdir(path) and not exist_ok:
raise NotADirectoryError()
path = os.path.join(self.cwd, path)
return self._fs.create_dir(path, recursive=True)
[docs]
def exists(self, path: str):
path = os.path.join(self.cwd, path)
info = self._fs.get_file_info(path)
return info.type != FileType.NotFound
[docs]
def rename(self, src, dst):
self._checkfork()
s = os.path.join(self.cwd, src)
d = os.path.join(self.cwd, dst)
return self._fs.move(s, d)
[docs]
def remove(self, path, recursive=False):
delpath = os.path.join(self.cwd, path)
if self.isdir(path):
if recursive:
self._fs.delete_dir(delpath)
elif list(self.list(path)):
raise RuntimeError("Directory not empty: {}".format(delpath))
else:
self._fs.delete_dir(delpath)
else:
self._fs.delete_file(delpath)
def normpath(self, file_path: str) -> str:
path = os.path.join(self.cwd, file_path)
norm_path = self._fs.normalize_path(path).rstrip('/')
return f"hdfs://{self._nameservice}/{norm_path}"