-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[Feature][Connectors-v2] Refactor DateTime Utils and Enhance Time String Auto-Format#10486
[Feature][Connectors-v2] Refactor DateTime Utils and Enhance Time String Auto-Format#10486hawk9821 wants to merge 9 commits intoapache:devfrom
Conversation
Purpose of this pull request
-
Refactor time utility class and enhance automatic formatting of time strings , and optimize auto-parsing performance
-
Fix error caused by inconsistent single-column time string formats during file parsing in File-Connector
create_date
2026-01-01
2026/01/01
2026-1-1
2026-1-10
throw execption
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException: ErrorCode:[FILE-08], ErrorDescription:[File read failed] - Read data from this file [default.default.default_file:/data/app/seatunnel/seatunnel- 2.3.12/tmp.txt] failed
at org.apache.seatunnel.connectors.seatunnel.file.source.reader .MultipleTableFileSourceReader.pollNext(MultipleTableFileSou rceReader.java:85) ~[?:?]
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeC ycle.collect(SourceFlowLifeCycle.java:159) ~[seatunnel-starter.jar:2.3.9]
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask. collect(SourceSeaTunnelTask.java:127) ~[seatunnel-starter.jar:2.3.9]
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateP rocess(SeaTunnelTask.java:169) ~[seatunnel-starter.jar:2.3.9]
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask. call(SourceSeaTunnelTask.java:132) ~[seatunnel-starter.jar:2.3.9]
at org.apache.seatunnel.engine.server.TaskExecutionService$Bloc kingWorker.run(TaskExecutionService.java:694) ~[seatunnel-starter.jar:2.3.9]
at org.apache.seatunnel.engine.server.TaskExecutionService$Name dTaskWrapper.run(TaskExecutionService.java:1019) ~[seatunnel-starter.jar:2.3.9]
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable .java:43) ~[seatunnel-starter.jar:2.3.9]
at java.util.concurrent.Executors$RunnableAdapter.call(Executor s.java:511) ~[?:1.8.0_161]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_161]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool Executor.java:1149) ~[?:1.8.0_161]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo lExecutor.java:624) ~[?:1.8.0_161]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]
Caused by: java.time.format.DateTimeParseException: Text '2026/01/01' could not be parsed at index 4
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFo rmatter.java:1949) ~[?:1.8.0_161]
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.j ava:1777) ~[?:1.8.0_161]
at org.apache.seatunnel.format.text.TextDeserializationSchema.c onvert(TextDeserializationSchema.java:318) ~[?:?]
at org.apache.seatunnel.format.text.TextDeserializationSchema.d eserialize(TextDeserializationSchema.java:188) ~[?:?]
at org.apache.seatunnel.format.text.TextDeserializationSchema.d eserialize(TextDeserializationSchema.java:60) ~[?:?]
at org.apache.seatunnel.connectors.seatunnel.file.source.reader .TextReadStrategy.lambda$readProcess$0(TextReadStrategy.java :108) ~[?:?]
at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[?:1.8.0_161]
at java.util.Spliterators$IteratorSpliterator.forEachRemaining( Spliterators.java:1801) ~[?:1.8.0_161]
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePip eline.java:580) ~[?:1.8.0_161]
at org.apache.seatunnel.connectors.seatunnel.file.source.reader .TextReadStrategy.readProcess(TextReadStrategy.java:104) ~[?:?]
at org.apache.seatunnel.connectors.seatunnel.file.source.reader .AbstractReadStrategy.resolveArchiveCompressedInputStream(Ab stractReadStrategy.java:268) ~[?:?]
at org.apache.seatunnel.connectors.seatunnel.file.source.reader .TextReadStrategy.read(TextReadStrategy.java:71) ~[?:?]
at org.apache.seatunnel.connectors.seatunnel.file.source.reader .MultipleTableFileSourceReader.pollNext(MultipleTableFileSou rceReader.java:81) ~[?:?]
... 12 more
- Fix user-defined configuration :
date_format,datetime_format,time_formatdoes not take effect.
Does this PR introduce any user-facing change?
no
How was this patch tested?
Added unit and e2e test cases.
Check list
- If any new Jar binary package adding in your PR, please add License Notice according
New License Guide - If necessary, please update the documentation to describe the new feature. https://github.com/apache/seatunnel/tree/dev/docs
- If necessary, please update
incompatible-changes.mdto describe the incompatibility caused by this PR. - If you are contributing the connector code, please check that the following files are updated:
- Update plugin-mapping.properties and add new connector information in it
- Update the pom file of seatunnel-dist
- Add ci label in label-scope-conf
- Add e2e testcase in seatunnel-e2e
- Update connector plugin_config
Issue 1: Error message not clear enough after re-matching failsLocation: Related context:
Problem description: Potential risks:
Impact scope:
Severity: MINOR Improvement suggestion: // DateTimeParseHelper.java:88-94
if (isUserConfigured) { throw errorSupplier.get(fieldVal, fieldName); } // Record original format information before retry DateTimeFormatter originalFormatter = fieldFormatterCache.get(fieldName); formatter = autoFormatterSupplier.get(fieldVal); if (formatter == null) { // Improve error message to indicate that automatic matching was attempted but failed throw new SeaTunnelRuntimeException( CommonErrorCode.FORMAT_DATETIME_ERROR, Map.of( "datetime", fieldVal, "field", fieldName, "reason", "Auto-match failed after initial format mismatch: " + (originalFormatter != null ? "cached format incompatible" : "no matching pattern") ) ); } Rationale: Provide more context to help users understand whether they need to explicitly configure a format or clean the data. Issue 2: FormatterConfig.getPatternStr() uses instanceof chain, violating Open-Closed PrincipleLocation: Related context:
Problem description: Potential risks:
Impact scope:
Severity: MINOR Improvement suggestion: // Add to Formatter interface
public interface Formatter<T> { String getValue(); String getPattern(); // New addition } // In each Formatter implementation public enum Formatter implements Formatter<Formatter> { YYYY_MM_DD("yyyy-MM-dd"); @Override public String getPattern() { return getValue(); // Default implementation } } // Simplify getPatternStr default String getPatternStr(FormatterConfig> formatterConfig) { return formatterConfig.getFormatter().getPattern(); } Rationale: Leverage polymorphism to avoid type checking, complying with the Open-Closed Principle. Issue 3: Missing documentation and CHANGELOGLocation:
Related context:
Problem description:
Potential risks:
Impact scope:
Severity: MAJOR (user experience issue) Improvement suggestion:
** Reason**: Follow Apache project specifications to ensure smooth user upgrades. ## Issue 4: Caching in concurrent scenarios may cause race conditions** Location**: ** Related context**:
** Problem description**: DateTimeFormatter formatter = fieldFormatterCache.get(fieldName); // read
if (formatter == null) { // ... compute new formatter ... fieldFormatterCache.put(fieldName, formatter); // write } Sui Ran
** Potential risks**:
** Impact scope**:
** Severity**: MINOR (In actual scenarios, field parsing for the first time is usually single-threaded, but the design is not robust enough) ** Improvement suggestions**: // Use computeIfAbsent to guarantee atomicity
DateTimeFormatter formatter = fieldFormatterCache.computeIfAbsent(fieldName, key -> { if (isUserConfigured) { String pattern = getPatternStr(formatterConfig); return DateTimeFormatter.ofPattern(pattern); } else { DateTimeFormatter matched = autoFormatterSupplier.get(fieldVal); if (matched == null) { throw errorSupplier.get(fieldVal, fieldName); } return matched; } }); // Parse phase try { return parser.parse(fieldVal, formatter); } catch (Exception e) { if (isUserConfigured) { throw errorSupplier.get(fieldVal, fieldName); } // Re-match: replace cached formatter DateTimeFormatter newFormatter = autoFormatterSupplier.get(fieldVal); if (newFormatter == null) { throw errorSupplier.get(fieldVal, fieldName); } // Atomic replacement (note: there may be concurrency issues here, but rare in actual scenarios) fieldFormatterCache.replace(fieldName, formatter, newFormatter); return parser.parse(fieldVal, newFormatter); } ** Reason**: Use ## Issue 5: Time format parsing lacks unified automatic matching support** Location**: ** Related context**:
** Problem description**: Shi Jian Ge Shi De Duo Yang Xing (HH:mm:ss vs H:mm:ss Deng )Ye Zhi De Lei Si You Hua . ** Potential risks**:
** Impact scope**:
** Severity**: MINOR (Performance optimization opportunity) ** Improvement suggestions**: // TimeUtils.java
private static final Map<Integer, List<TimePattern>> TIME_PATTERN_MAP = new HashMap<>(); private static final int TIME_LENGTH_THRESHOLD = 12; // e.g. "HH:mm:ss.SSSSSSSSS" static { initTimePatternMap(); } private static void initTimePatternMap() { // Group by string length: 8 (HH:mm:ss), 9 (H:mm:ss), 12 (HH:mm:ss.SSSSSSSSS), etc. List<TimePattern> length8Patterns = new ArrayList<>(); length8Patterns.add(new TimePattern("\\d{2}:\\d{2}:\\d{2}", Formatter.HH_MM_SS)); TIME_PATTERN_MAP.put(8, length8Patterns); // ... other lengths } public static DateTimeFormatter matchTimeFormatter(String timeStr) { if (timeStr == null || timeStr.isEmpty()) { throw new IllegalArgumentException("Time string cannot be null or empty"); } int strLength = timeStr.length(); List<TimePattern> timePatterns = TIME_PATTERN_MAP.getOrDefault(strLength, Collections.emptyList()); for (TimePattern pattern : timePatterns) { if (pattern.getPattern().matcher(timeStr).matches()) { return pattern.getFormatter(); } } return null; // or return match from extra-long group } Rationale: Maintain architectural consistency, improve time field parsing performance. |
|
@hawk9821 Do the following time formats of data support 1/2/2026 12:01:30 |
1c7a97a to
d84f418
Compare
Supported |
[Feature][Connector-v2]Time Format Extension
# Conflicts:
# seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
# seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java