Light Mode

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Report Beam Lineage from Parquet reads#5850

Draft
shnapz wants to merge 9 commits intomainfrom
akabas/parquet-lineage
Draft

Report Beam Lineage from Parquet reads#5850
shnapz wants to merge 9 commits intomainfrom
akabas/parquet-lineage

Conversation

Copy link
Contributor

shnapz commented Jan 2, 2026

No description provided.

shnapz commented Jan 2, 2026
): SCollection[Example] = {
val job = Job.getInstance(conf)
GcsConnectorUtil.setInputPaths(sc, job, path)
val filePattern = ScioUtil.filePattern(path, params.suffix)
Copy link
Contributor Author

shnapz Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am surprised suffix was not used initially. Or was it intentional?

Copy link

codecov bot commented Jan 2, 2026 *
edited
Loading

Codecov Report

Patch coverage is 55.81395% with 19 lines in your changes missing coverage. Please review.
Project coverage is 61.56%. Comparing base (c234ba6) to head (b0a3741).
Report is 11 commits behind head on main.

Files with missing lines Patch % Lines
...ify/scio/parquet/tensorflow/ParquetExampleIO.scala 0.00% 7 Missing
...com/spotify/scio/parquet/types/ParquetTypeIO.scala 0.00% 7 Missing
...scala/com/spotify/scio/parquet/HadoopParquet.scala 81.48% 5 Missing
Additional details and impacted files
@@ Coverage Diff @@
## main #5850 +/- ##
==========================================
+ Coverage 61.49% 61.56% +0.06%
==========================================
Files 317 318 +1
Lines 11650 11678 +28
Branches 845 834 -11
==========================================
+ Hits 7164 7189 +25
- Misses 4486 4489 +3

View full report in Codecov by Sentry.
Have feedback on the report? Share it here.

New features to boost your workflow:
  • Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

shnapz added 2 commits January 5, 2026 16:35
shnapz marked this pull request as ready for review January 5, 2026 22:18
shnapz commented Jan 7, 2026
shnapz commented Jan 7, 2026
shnapz marked this pull request as draft January 8, 2026 15:37
kellen reviewed Jan 8, 2026
Some(projectionFn),
None
)
.parDo(new LineageReportDoFn(filePattern))
Copy link
Contributor

kellen Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this going to result in a new node in the graph? Why are we doing this in sequence w/ the read if it's not actually using any of the read elements; we should be doing like the scio init metrics which is just its own distinct graph create impulse -> submit parquet lineage

Copy link
Contributor Author

shnapz Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to correspond Beam conventions and have this metric associated with the actual read transform. This way we keep transform-level lineage (which is supported in Beam)

tracker.currentRestriction.getFrom,
if (splitGranularity == SplitGranularity.File) "end" else tracker.currentRestriction().getTo
)
FileSystems.reportSourceLineage(file.getMetadata.resourceId())
Copy link
Contributor

kellen Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different than the hadoop one insofar as we get every file here, right? That seems bad/annoying for using the lineage for anything

Copy link
Contributor Author

shnapz Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually file-level lineage is the default approach in Beam. Which we might not need directly. Both Lineage Metric implementations (legacy and new one) work ok with many files:
StringSet has internal truncation to 100
BoundedTrie is a data structure that stores hierarchical data very well

override def apply(input: Void): java.lang.Boolean = true
})

val withSkipClone = skipValueClone.fold(hadoop)(skip => hadoop.withSkipValueClone(skip))
Copy link
Contributor

kellen Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withSkipValueClone?

import java.util.concurrent.atomic.AtomicBoolean
import scala.reflect.ClassTag

private[parquet] object HadoopParquet {
Copy link
Contributor

kellen Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to reduce duplication or is there a functional change here?

Copy link
Contributor Author

shnapz Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to reduce, no new functionality, except I noticed that in some cases Scio's derived coder was not set to HadoopFormatIO transformation. Probably Beam auto-derives the same coder, but anyway it is better to set explicitly

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Reviewers

kellen kellen left review comments

Assignees

No one assigned

Labels

None yet

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

2 participants