Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -1554,7 +1554,6 @@ class BeamModulePlugin implements Plugin<Project> {
"InvalidThrows",
"JavaTimeDefaultTimeZone",
"JavaUtilDate",
"JodaConstructors",
"MalformedInlineTag",
"MissingSummary",
"MixedMutabilityReturnType",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public static void main(String[] args) {
// starts.
Duration windowSize = Duration.standardSeconds(WINDOW_TIME);
Instant nextWindowStart =
new Instant(
Instant.ofEpochMilli(
Instant.now().getMillis()
+ windowSize.getMillis()
- Instant.now().plus(windowSize).getMillis() % windowSize.getMillis());
Comment on lines 119 to 121
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Calling Instant.now() multiple times within the same expression can lead to inconsistencies if the system clock ticks between the calls. It's safer to call it once and store the result in a local variable before using it. For example:

final Instant now = Instant.now();
Instant nextWindowStart =
    Instant.ofEpochMilli(
        now.getMillis()
            + windowSize.getMillis()
            - now.plus(windowSize).getMillis() % windowSize.getMillis());

Expand Down Expand Up @@ -156,7 +156,8 @@ public void run() {
.apply(
GenerateSequence.from(0)
.withRate(MESSAGES_COUNT, Duration.standardSeconds(WINDOW_TIME))
.withTimestampFn((Long n) -> new Instant(System.currentTimeMillis())))
.withTimestampFn(
(Long n) -> Instant.ofEpochMilli(System.currentTimeMillis())))
.apply(ParDo.of(new RandomUserScoreGeneratorFn()));
input.apply(
KafkaIO.<String, Integer>write()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ static class AddTimestampFn extends DoFn<String, String> {
@ProcessElement
public void processElement(@Element String element, OutputReceiver<String> receiver) {
Instant randomTimestamp =
new Instant(
Instant.ofEpochMilli(
ThreadLocalRandom.current()
.nextLong(minTimestamp.getMillis(), maxTimestamp.getMillis()));

Expand Down Expand Up @@ -189,8 +189,8 @@ public interface Options

static void runWindowedWordCount(Options options) throws IOException {
final String output = options.getOutput();
final Instant minTimestamp = new Instant(options.getMinTimestampMillis());
final Instant maxTimestamp = new Instant(options.getMaxTimestampMillis());
final Instant minTimestamp = Instant.ofEpochMilli(options.getMinTimestampMillis());
final Instant maxTimestamp = Instant.ofEpochMilli(options.getMaxTimestampMillis());

Pipeline pipeline = Pipeline.create(options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void processElement(ProcessContext c) {
String userName = (String) row.get("contributor_username");
if (userName != null) {
// Sets the implicit timestamp field to be used in windowing.
c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
c.outputWithTimestamp(userName, Instant.ofEpochMilli(timestamp * 1000L));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ public void processElement(DoFn<String, String>.ProcessContext c) throws Excepti
if (items.length > 0) {
try {
String timestamp = items[0];
c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp)));
c.outputWithTimestamp(
c.element(), Instant.ofEpochMilli(dateTimeFormat.parseMillis(timestamp)));
} catch (IllegalArgumentException e) {
// Skip the invalid input.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ public void processElement(DoFn<String, String>.ProcessContext c) throws Excepti
String timestamp = tryParseTimestamp(items);
if (timestamp != null) {
try {
c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp)));
c.outputWithTimestamp(
c.element(), Instant.ofEpochMilli(dateTimeFormat.parseMillis(timestamp)));
} catch (IllegalArgumentException e) {
// Skip the invalid input.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ public static void main(String[] args) throws Exception {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);

final Instant stopMinTimestamp = new Instant(minFmt.parseMillis(options.getStopMin()));
final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin()));
final Instant stopMinTimestamp = Instant.ofEpochMilli(minFmt.parseMillis(options.getStopMin()));
final Instant startMinTimestamp =
Instant.ofEpochMilli(minFmt.parseMillis(options.getStartMin()));

// Read 'gaming' events from a text file.
pipeline
Expand Down Expand Up @@ -161,7 +162,7 @@ public static void main(String[] args) throws Exception {
// Add an element timestamp based on the event log, and apply fixed windowing.
.apply(
"AddEventTimestamps",
WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
WithTimestamps.of((GameActionInfo i) -> Instant.ofEpochMilli(i.getTimestamp())))
.apply(
"FixedWindowsTeam",
Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration()))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ public void processElement(ProcessContext c) throws Exception {
int range = MAX_DELAY - MIN_DELAY;
int delayInMinutes = random.nextInt(range) + MIN_DELAY;
long delayInMillis = TimeUnit.MINUTES.toMillis(delayInMinutes);
timestamp = new Instant(timestamp.getMillis() - delayInMillis);
timestamp = Instant.ofEpochMilli(timestamp.getMillis() - delayInMillis);
}
c.outputWithTimestamp(c.element(), timestamp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -802,21 +802,21 @@ public static void main(String[] args) {
Create.timestamped(
TimestampedValue.of(
new TableRow().set("user", "mobile").set("score", 12).set("gap", 5),
new Instant()),
Instant.now()),
TimestampedValue.of(
new TableRow().set("user", "desktop").set("score", 4), new Instant()),
new TableRow().set("user", "desktop").set("score", 4), Instant.now()),
TimestampedValue.of(
new TableRow().set("user", "mobile").set("score", -3).set("gap", 5),
new Instant().plus(Duration.millis(2000))),
Instant.now().plus(Duration.millis(2000))),
TimestampedValue.of(
new TableRow().set("user", "mobile").set("score", 2).set("gap", 5),
new Instant().plus(Duration.millis(9000))),
Instant.now().plus(Duration.millis(9000))),
TimestampedValue.of(
new TableRow().set("user", "mobile").set("score", 7).set("gap", 5),
new Instant().plus(Duration.millis(12000))),
Instant.now().plus(Duration.millis(12000))),
TimestampedValue.of(
new TableRow().set("user", "desktop").set("score", 10),
new Instant().plus(Duration.millis(12000)))));
Instant.now().plus(Duration.millis(12000)))));
Comment on lines 803 to +819
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There are multiple calls to Instant.now() here. This will result in slightly different timestamps for each TimestampedValue. If the intention is for these events to be relative to a single point in time, it would be better to call Instant.now() once and reuse the value. For example:

final Instant now = Instant.now();
PCollection<TableRow> pc =
    p.apply(
        "Create Events",
        Create.timestamped(
            TimestampedValue.of(
                new TableRow().set("user", "mobile").set("score", 12).set("gap", 5),
                now),
            TimestampedValue.of(
                new TableRow().set("user", "desktop").set("score", 4), now),
            TimestampedValue.of(
                new TableRow().set("user", "mobile").set("score", -3).set("gap", 5),
                now.plus(Duration.millis(2000))),
            // ... and so on
            ));

// [END CustomSessionWindow5]

// [START CustomSessionWindow6]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ private void testWindowedWordCountPipeline(WindowedWordCountITOptions options) t

for (int startMinute : ImmutableList.of(0, 10, 20, 30, 40, 50)) {
final Instant windowStart =
new Instant(options.getMinTimestampMillis()).plus(Duration.standardMinutes(startMinute));
Instant.ofEpochMilli(options.getMinTimestampMillis())
.plus(Duration.standardMinutes(startMinute));
String filePrefix =
filenamePolicy.filenamePrefixForWindow(
new IntervalWindow(windowStart, windowStart.plus(Duration.standardMinutes(10))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ public void testTinyAutoComplete() {
public void testWindowedAutoComplete() {
List<TimestampedValue<String>> words =
Arrays.asList(
TimestampedValue.of("xA", new Instant(1)),
TimestampedValue.of("xA", new Instant(1)),
TimestampedValue.of("xB", new Instant(1)),
TimestampedValue.of("xB", new Instant(2)),
TimestampedValue.of("xB", new Instant(2)));
TimestampedValue.of("xA", Instant.ofEpochMilli(1)),
TimestampedValue.of("xA", Instant.ofEpochMilli(1)),
TimestampedValue.of("xB", Instant.ofEpochMilli(1)),
TimestampedValue.of("xB", Instant.ofEpochMilli(2)),
TimestampedValue.of("xB", Instant.ofEpochMilli(2)));

PCollection<String> input = p.apply(Create.timestamped(words));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class HourlyTeamScoreTest implements Serializable {
@Test
public void testUserScoresFilter() throws Exception {

final Instant startMinTimestamp = new Instant(1447965680000L);
final Instant startMinTimestamp = Instant.ofEpochMilli(1447965680000L);

PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
public class LeaderBoardTest implements Serializable {
private static final Duration ALLOWED_LATENESS = Duration.standardHours(1);
private static final Duration TEAM_WINDOW_DURATION = Duration.standardMinutes(20);
private Instant baseTime = new Instant(0);
private Instant baseTime = Instant.ofEpochMilli(0);

@Rule public TestPipeline p = TestPipeline.create();
/** Some example users, on two separate teams. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
@RunWith(JUnit4.class)
public class StatefulTeamScoreTest {

private Instant baseTime = new Instant(0);
private Instant baseTime = Instant.ofEpochMilli(0);

@Rule public TestPipeline p = TestPipeline.create();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,17 @@ public class TriggerExampleTest {
"01/01/2010 00:00:00,1108302,5,W,ML,36,100,30,0.0065,66,9,1,0.001,"
+ "74.8,1,9,3,0.0028,71,1,9,12,0.0099,87.4,1,9,13,0.0121,99.0,1,,,,,0,,,,,0,,,,,0,,,"
+ ",,0",
new Instant(60000)),
Instant.ofEpochMilli(60000)),
TimestampedValue.of(
"01/01/2010 00:00:00,1108302,110,E,ML,36,100,40,0.0065,66,9,1,0.001,"
+ "74.8,1,9,3,0.0028,71,1,9,12,0.0099,67.4,1,9,13,0.0121,99.0,1,,,,,0,,,,,0,,,,,0,,,"
+ ",,0",
new Instant(1)),
Instant.ofEpochMilli(1)),
TimestampedValue.of(
"01/01/2010 00:00:00,1108302,110,E,ML,36,100,50,0.0065,66,9,1,"
+ "0.001,74.8,1,9,3,0.0028,71,1,9,12,0.0099,97.4,1,9,13,0.0121,50.0,1,,,,,0,,,,,0"
+ ",,,,,0,,,,,0",
new Instant(1)));
Instant.ofEpochMilli(1)));

private static final TableRow OUT_ROW_1 =
new TableRow()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ final class ReadFromTwitterDoFn extends DoFn<TwitterConfig, String> {
private final DateTime startTime;

ReadFromTwitterDoFn() {
this.startTime = new DateTime();
this.startTime = DateTime.now();
}
/* Logger for class.*/
private static final Logger LOG = LoggerFactory.getLogger(ReadFromTwitterDoFn.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
#
# Java code snippet to generate example bytes:
# Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
# Instant now = new Instant(1000L);
# Instant now = Instant.ofEpochMilli(1000L);
# Timer<String> timer = Timer.of(
# "key",
# "tag",
Expand Down Expand Up @@ -597,7 +597,7 @@ examples:

# Java code snippet to generate example bytes:
# TimestampPrefixingWindowCoder<IntervalWindow> coder = TimestampPrefixingWindowCoder.of(IntervalWindowCoder.of());
# Instant end = new Instant(-9223372036854410L);
# Instant end = Instant.ofEpochMilli(-9223372036854410L);
# Duration span = Duration.millis(365L);
# IntervalWindow window = new IntervalWindow(end.minus(span), span);
# byte[] bytes = CoderUtils.encodeToByteArray(coder, window);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
Matcher<T> valueMatcher, long timestamp, long windowStart, long windowEnd) {
IntervalWindow intervalWindow =
new IntervalWindow(new Instant(windowStart), new Instant(windowEnd));
new IntervalWindow(Instant.ofEpochMilli(windowStart), Instant.ofEpochMilli(windowEnd));
return WindowMatchers.isSingleWindowedValue(
valueMatcher,
Matchers.describedAs("%0", Matchers.equalTo(new Instant(timestamp)), timestamp),
Matchers.describedAs("%0", Matchers.equalTo(Instant.ofEpochMilli(timestamp)), timestamp),
Matchers.equalTo(intervalWindow),
Matchers.anything());
}
Expand All @@ -138,10 +138,10 @@ public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
long windowEnd,
PaneInfo paneInfo) {
IntervalWindow intervalWindow =
new IntervalWindow(new Instant(windowStart), new Instant(windowEnd));
new IntervalWindow(Instant.ofEpochMilli(windowStart), Instant.ofEpochMilli(windowEnd));
return WindowMatchers.isSingleWindowedValue(
valueMatcher,
Matchers.describedAs("%0", Matchers.equalTo(new Instant(timestamp)), timestamp),
Matchers.describedAs("%0", Matchers.equalTo(Instant.ofEpochMilli(timestamp)), timestamp),
Matchers.equalTo(intervalWindow),
Matchers.equalTo(paneInfo));
}
Expand Down Expand Up @@ -186,7 +186,8 @@ public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
}

public static Matcher<IntervalWindow> intervalWindow(long start, long end) {
return Matchers.equalTo(new IntervalWindow(new Instant(start), new Instant(end)));
return Matchers.equalTo(
new IntervalWindow(Instant.ofEpochMilli(start), Instant.ofEpochMilli(end)));
}

public static <T> Matcher<WindowedValue<? extends T>> valueWithPaneInfo(final PaneInfo paneInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public static ByteString encodeInt64Gauge(GaugeData data) {
public static GaugeData decodeInt64Gauge(ByteString payload) {
InputStream input = payload.newInput();
try {
Instant timestamp = new Instant(VARINT_CODER.decode(input));
Instant timestamp = Instant.ofEpochMilli(VARINT_CODER.decode(input));
return GaugeData.create(VARINT_CODER.decode(input), timestamp);
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public AfterDelayFromFirstElementStateMachine alignedTo(
* the epoch.
*/
public AfterDelayFromFirstElementStateMachine alignedTo(final Duration size) {
return alignedTo(size, new Instant(0));
return alignedTo(size, Instant.ofEpochMilli(0));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private static TriggerStateMachine stateMachineForAfterProcessingTime(
stateMachine =
stateMachine.alignedTo(
Duration.millis(transform.getAlignTo().getPeriod()),
new Instant(transform.getAlignTo().getOffset()));
Instant.ofEpochMilli(transform.getAlignTo().getOffset()));
break;
case DELAY:
stateMachine =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void testCallbackRegistration() {
InMemoryBundleFinalizer finalizer = new InMemoryBundleFinalizer();
// Check when nothing has been registered
assertThat(finalizer.getAndClearFinalizations(), is(empty()));
finalizer.afterBundleCommit(new Instant(), () -> {});
finalizer.afterBundleCommit(Instant.now(), () -> {});
assertThat(finalizer.getAndClearFinalizations(), hasSize(1));
// Check that it is empty
assertThat(finalizer.getAndClearFinalizations(), is(empty()));
Expand Down
Loading
Loading