This is a refactoring to make the pattern we use to build concrete objects from python config objects more explicit.
I also moved the clocks
stuff in a submodule, but nothing should really change.
The first trait introduced is ParentClass
. This trait should be implemented by any class we expose in Python that is a parent class of config objects, and it requires a method to convert from parent to one of the children:
pub(crate) trait ParentClass {
type Children;
fn get_subclass(&self, py: Python) -> StringResult<Self::Children>;
}
Then we have one trait for each module, that specific configurations have to implement. This trait requires a method to build a concrete object from a config one, so in the inputs
modules it builds an InputReader
from a child of the InputConfig
class.
For example in the input module we have:
pub(crate) trait InputBuilder {
fn build(
&self,
py: Python,
worker_index: WorkerIndex,
worker_count: usize,
resume_snapshot: Option<StateBytes>,
) -> StringResult<Box<dyn InputReader<TdPyAny>>>;
}
And inputs have to implement this trait, example in ManualInputconfig:
impl InputBuilder for ManualInputConfig {
fn build(
&self,
py: Python,
worker_index: WorkerIndex,
worker_count: usize,
resume_snapshot: Option<StateBytes>,
) -> StringResult<Box<dyn InputReader<TdPyAny>>> {
Ok(Box::new(ManualInput::new(
py,
self.input_builder.clone(),
worker_index,
worker_count,
resume_snapshot,
)))
}
}
The nice thing is that this allows us to implement InputBuilder
for the generic InputConfig
if it implements the ParentClass
trait:
impl InputBuilder for Py<InputConfig> {
fn build(
&self,
py: Python,
worker_index: WorkerIndex,
worker_count: usize,
resume_snapshot: Option<StateBytes>,
) -> StringResult<Box<dyn InputReader<TdPyAny>>> {
self.get_subclass(py)?
.build(py, worker_index, worker_count, resume_snapshot)
}
}
So that we can do this:
let input_reader = input_config.build(py, worker_index, worker_count, step_resume_state)?;
instead of what we used to do:
let input_reader = build_input_reader(
py,
input_config,
worker_index,
worker_count,
step_resume_state,
)?;
NOTE: I stopped here, but we could think of a more generic PyBuilder
trait so that we don't need a new trait for each module, but after trying it I think it makes the implementation a bit harder to read:
pub(crate) trait PyBuilder {
type Input;
type Output;
fn build(&self, py: Python, params: Self::Input) -> StringResult<Self::Output>;
}
Which turn the impl for ManualInputConfig into this
impl PyBuilder for ManualInputConfig {
type Input = (WorkerIndex, usize, Option<StateBytes>);
type Output = Box<dyn InputReader<TdPyAny>>;
fn build(
&self,
py: Python,
(worker_index, worker_count, resume_snapshot): Self::Input,
) -> StringResult<Self::Output> {
Ok(Box::new(ManualInput::new(
py,
self.input_builder.clone(),
worker_index,
worker_count,
resume_snapshot,
)))
}
}
And the ParentClass impl to something like:
impl ParentClass for Py<InputConfig> {
type Children = Box<
dyn PyBuilder<
Input = (WorkerIndex, usize, Option<StateBytes>),
Output = Box<dyn InputReader<TdPyAny>>,
>,
>;
fn get_subclass(&self, py: Python) -> StringResult<Self::Children> {
if let Ok(conf) = self.extract::<ManualInputConfig>(py) {
Ok(Box::new(conf))
} else if let Ok(conf) = self.extract::<KafkaInputConfig>(py) {
Ok(Box::new(conf))
} else {
let pytype = self.as_ref(py).get_type();
Err(format!("Unknown input_config type: {pytype}"))
}
}
}
Final edit: The end goal of this was to write a blanket implementation of PyBuilder
for any struct that implement ParentClass
with Children = Box<dyn PyBuilder>
, but it doesn't work as it is. I'm going to write it here as a reference, maybe I was close enough:
impl<T, B> PyBuilder for T
where
B: PyBuilder,
T: ParentClass<Children = Box<B>>,
{
type Input = B::Input;
type Output = B::Output;
fn build(&self, py: Python, params: Self::Input) -> StringResult<Self::Output> {
self.get_subclass(py)?.build(py, params)
}
}