1.背景
在工作中碰到了个需求,需要将Spark Streaming
中的文件写入到Hive
表中,但是Spark Streaming
中的saveAsTextFiles
会自己定义很多文件夹,不符合Hive
读取文件的规范且saveAsTextFiles
中的参数只能定义文件夹的名字,第二个是采用Spark Streaming
中的foreachRDD
,这个方法会将DStream
转成再进行操作,但是Spark Streaming
中的是多批次处理的结构,也就是很多RDD,每个RDD的saveAsTextFile
都会将前面的数据覆盖,所以最终采用的方法是重写saveAsTextFile
输出时的文件名
2.分析
2.1 分析代码
既然是重写saveAsTextFile
输出逻辑,那先看看他是如何实现输出的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def saveAsTextFile(path: String): Unit = withScope {
// https://issues.apache.org/jira/browse/SPARK-2075
//
// NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
// Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`
// in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an
// Ordering for `NullWritable`. That's why the compiler will generate different anonymous
// classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
//
// Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate
// same bytecodes for `saveAsTextFile`.
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
val textClassTag = implicitly[ClassTag[Text]]
val r = this.mapPartitions { iter =>
val text = new Text()
iter.map { x =>
text.set(x.toString)
(NullWritable.get(), text)
}
}
RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}
可以看出saveAsTextFile
是依赖saveAsHadoopFile
进行输出,因为saveAsHadoopFile
接受PairRDD
,所以在saveAsTextFile
中通过rddToPairRDDFunctions
转成(NullWritable,Text)类型的RDD,再通过saveAsHadoopFile
进行输出
可以看出输出的逻辑还是Hadoop的那一套,所以我们可以通过重写TextOutputFormat
来解决输出文件名的相同的问题
2.2 代码编写
2.2.1 saveAsHadoopFile算子
首先先看下官方提供的saveAsHadoopFile
算子说明
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD.
*/
def saveAsHadoopFile[F <: OutputFormat[K, V]](
path: String)(implicit fm: ClassTag[F]): Unit = self.withScope {
saveAsHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])
}
/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD. Compress the result with the
* supplied codec.
*/
def saveAsHadoopFile[F <: OutputFormat[K, V]](
path: String,
codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): Unit = self.withScope {
val runtimeClass = fm.runtimeClass
saveAsHadoopFile(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec)
}
/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD. Compress with the supplied codec.
*/
def saveAsHadoopFile(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
codec: Class[_ <: CompressionCodec]): Unit = self.withScope {
saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass,
new JobConf(self.context.hadoopConfiguration), Some(codec))
}
/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD.
*
* Note that, we should make sure our tasks are idempotent when speculation is enabled, i.e. do
* not use output committer that writes data directly.
* There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
* result of using direct output committer with speculation enabled.
*/
def saveAsHadoopFile(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf(self.context.hadoopConfiguration),
codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {...}
这里我们使用的是def saveAsHadoopFile(path: String, keyClass: Class[_],valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]): Unit = self.withScope { }
依次传入 path:路径、keyClass:key类型、valueClass:value类型、outputFormatClass:outformat方式,剩下两个参数为默认值
2.2.2 MultipleTextOutputFormat分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
/**
* This abstract class extends the FileOutputFormat, allowing to write the
* output data to different output files. There are three basic use cases for
* this class.
*
* Case one: This class is used for a map reduce job with at least one reducer.
* The reducer wants to write data to different files depending on the actual
* keys. It is assumed that a key (or value) encodes the actual key (value)
* and the desired location for the actual key (value).
*
* Case two: This class is used for a map only job. The job wants to use an
* output file name that is either a part of the input file name of the input
* data, or some derivation of it.
*
* Case three: This class is used for a map only job. The job wants to use an
* output file name that depends on both the keys and the input file name,
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class MultipleOutputFormat<K, V>
extends FileOutputFormat<K, V> {
/**
* Create a composite record writer that can write key/value data to different
* output files
*
* @param fs
* the file system to use
* @param job
* the job conf for the job
* @param name
* the leaf file name for the output file (such as part-00000")
* @param arg3
* a progressable for reporting progress.
* @return a composite record writer
* @throws IOException
*/
public RecordWriter<K, V> getRecordWriter(FileSystem fs, JobConf job,
String name, Progressable arg3) throws IOException {
final FileSystem myFS = fs;
final String myName = generateLeafFileName(name);
final JobConf myJob = job;
final Progressable myProgressable = arg3;
return new RecordWriter<K, V>() {
// a cache storing the record writers for different output files.
TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap<String, RecordWriter<K, V>>();
public void write(K key, V value) throws IOException {
// get the file name based on the key
String keyBasedPath = generateFileNameForKeyValue(key, value, myName);
// get the file name based on the input file name
String finalPath = getInputFileBasedOutputFileName(myJob, keyBasedPath);
// get the actual key
K actualKey = generateActualKey(key, value);
V actualValue = generateActualValue(key, value);
RecordWriter<K, V> rw = this.recordWriters.get(finalPath);
if (rw == null) {
// if we don't have the record writer yet for the final path, create
// one
// and add it to the cache
rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable);
this.recordWriters.put(finalPath, rw);
}
rw.write(actualKey, actualValue);
};
public void close(Reporter reporter) throws IOException {
Iterator<String> keys = this.recordWriters.keySet().iterator();
while (keys.hasNext()) {
RecordWriter<K, V> rw = this.recordWriters.get(keys.next());
rw.close(reporter);
}
this.recordWriters.clear();
};
};
}
/**
* Generate the leaf name for the output file name. The default behavior does
* not change the leaf file name (such as part-00000)
*
* @param name
* the leaf file name for the output file
* @return the given leaf file name
*/
protected String generateLeafFileName(String name) {
return name;
}
/**
* Generate the file output file name based on the given key and the leaf file
* name. The default behavior is that the file name does not depend on the
* key.
*
* @param key
* the key of the output data
* @param name
* the leaf file name
* @return generated file name
*/
protected String generateFileNameForKeyValue(K key, V value, String name) {
return name;
}
/**
* Generate the actual key from the given key/value. The default behavior is that
* the actual key is equal to the given key
*
* @param key
* the key of the output data
* @param value
* the value of the output data
* @return the actual key derived from the given key/value
*/
protected K generateActualKey(K key, V value) {
return key;
}
/**
* Generate the actual value from the given key and value. The default behavior is that
* the actual value is equal to the given value
*
* @param key
* the key of the output data
* @param value
* the value of the output data
* @return the actual value derived from the given key/value
*/
protected V generateActualValue(K key, V value) {
return value;
}
/**
* Generate the outfile name based on a given anme and the input file name. If
* the {@link JobContext#MAP_INPUT_FILE} does not exists (i.e. this is not for a map only job),
* the given name is returned unchanged. If the config value for
* "num.of.trailing.legs.to.use" is not set, or set 0 or negative, the given
* name is returned unchanged. Otherwise, return a file name consisting of the
* N trailing legs of the input file name where N is the config value for
* "num.of.trailing.legs.to.use".
*
* @param job
* the job config
* @param name
* the output file name
* @return the outfile name based on a given anme and the input file name.
*/
protected String getInputFileBasedOutputFileName(JobConf job, String name) {
String infilepath = job.get(MRJobConfig.MAP_INPUT_FILE);
if (infilepath == null) {
// if the {@link JobContext#MAP_INPUT_FILE} does not exists,
// then return the given name
return name;
}
int numOfTrailingLegsToUse = job.getInt("mapred.outputformat.numOfTrailingLegs", 0);
if (numOfTrailingLegsToUse <= 0) {
return name;
}
Path infile = new Path(infilepath);
Path parent = infile.getParent();
String midName = infile.getName();
Path outPath = new Path(midName);
for (int i = 1; i < numOfTrailingLegsToUse; i++) {
if (parent == null) break;
midName = parent.getName();
if (midName.length() == 0) break;
parent = parent.getParent();
outPath = new Path(midName, outPath);
}
return outPath.toString();
}
/**
*
* @param fs
* the file system to use
* @param job
* a job conf object
* @param name
* the name of the file over which a record writer object will be
* constructed
* @param arg3
* a progressable object
* @return A RecordWriter object over the given file
* @throws IOException
*/
abstract protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
JobConf job, String name, Progressable arg3) throws IOException;
}
可以看出,在写每条记录之前,MultipleOutputFormat将调用generateFileNameForKeyValue方法来确定文件名,所以在只需要重写generateFileNameForKeyValue方法即可
2.2.3 MultipleOutFormat重写
1
2
3
4
5
6
7
8
9
10
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
private val start_time = System.currentTimeMillis()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = {
val service_date = start_time + "-" + name //.split("-")(0)
service_date
}
}
Spark Streaming 代码修改
1
2
3
...//业务代码
.map(x=>(x,""))//由于saveAsHadoopFile接受PariRDD,所以需要转成这样
.saveAsHadoopFile(finallpath,classOf[String],classOf[String],classOf[RDDMultipleTextOutputFormat])
到此,已经可以解决覆盖问题